sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26    TableFunction,
27};
28
29/// Query engine that executes SQL directly on `DataTable`
30#[derive(Clone)]
31pub struct QueryEngine {
32    case_insensitive: bool,
33    date_notation: String,
34    _behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl QueryEngine {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            case_insensitive: false,
48            date_notation: get_date_notation(),
49            _behavior_config: None,
50        }
51    }
52
53    #[must_use]
54    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55        let case_insensitive = config.case_insensitive_default;
56        // Use get_date_notation() to respect environment variable override
57        let date_notation = get_date_notation();
58        Self {
59            case_insensitive,
60            date_notation,
61            _behavior_config: Some(config),
62        }
63    }
64
65    #[must_use]
66    pub fn with_date_notation(_date_notation: String) -> Self {
67        Self {
68            case_insensitive: false,
69            date_notation: get_date_notation(), // Always use the global function
70            _behavior_config: None,
71        }
72    }
73
74    #[must_use]
75    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76        Self {
77            case_insensitive,
78            date_notation: get_date_notation(),
79            _behavior_config: None,
80        }
81    }
82
83    #[must_use]
84    pub fn with_case_insensitive_and_date_notation(
85        case_insensitive: bool,
86        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
87    ) -> Self {
88        Self {
89            case_insensitive,
90            date_notation: get_date_notation(), // Always use the global function
91            _behavior_config: None,
92        }
93    }
94
95    /// Find a column name similar to the given name using edit distance
96    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97        let columns = table.column_names();
98        let mut best_match: Option<(String, usize)> = None;
99
100        for col in columns {
101            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102            // Only suggest if distance is small (likely a typo)
103            // Allow up to 3 edits for longer names
104            let max_distance = if name.len() > 10 { 3 } else { 2 };
105            if distance <= max_distance {
106                match &best_match {
107                    None => best_match = Some((col, distance)),
108                    Some((_, best_dist)) if distance < *best_dist => {
109                        best_match = Some((col, distance));
110                    }
111                    _ => {}
112                }
113            }
114        }
115
116        best_match.map(|(name, _)| name)
117    }
118
119    /// Calculate Levenshtein edit distance between two strings
120    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121        let len1 = s1.len();
122        let len2 = s2.len();
123        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125        for i in 0..=len1 {
126            matrix[i][0] = i;
127        }
128        for j in 0..=len2 {
129            matrix[0][j] = j;
130        }
131
132        for (i, c1) in s1.chars().enumerate() {
133            for (j, c2) in s2.chars().enumerate() {
134                let cost = usize::from(c1 != c2);
135                matrix[i + 1][j + 1] = std::cmp::min(
136                    matrix[i][j + 1] + 1, // deletion
137                    std::cmp::min(
138                        matrix[i + 1][j] + 1, // insertion
139                        matrix[i][j] + cost,  // substitution
140                    ),
141                );
142            }
143        }
144
145        matrix[len1][len2]
146    }
147
148    /// Check if an expression contains UNNEST function call
149    fn contains_unnest(expr: &SqlExpression) -> bool {
150        match expr {
151            // Direct UNNEST variant
152            SqlExpression::Unnest { .. } => true,
153            SqlExpression::FunctionCall { name, args, .. } => {
154                if name.to_uppercase() == "UNNEST" {
155                    return true;
156                }
157                // Check recursively in function arguments
158                args.iter().any(Self::contains_unnest)
159            }
160            SqlExpression::BinaryOp { left, right, .. } => {
161                Self::contains_unnest(left) || Self::contains_unnest(right)
162            }
163            SqlExpression::Not { expr } => Self::contains_unnest(expr),
164            SqlExpression::CaseExpression {
165                when_branches,
166                else_branch,
167            } => {
168                when_branches.iter().any(|branch| {
169                    Self::contains_unnest(&branch.condition)
170                        || Self::contains_unnest(&branch.result)
171                }) || else_branch
172                    .as_ref()
173                    .map_or(false, |e| Self::contains_unnest(e))
174            }
175            SqlExpression::SimpleCaseExpression {
176                expr,
177                when_branches,
178                else_branch,
179            } => {
180                Self::contains_unnest(expr)
181                    || when_branches.iter().any(|branch| {
182                        Self::contains_unnest(&branch.value)
183                            || Self::contains_unnest(&branch.result)
184                    })
185                    || else_branch
186                        .as_ref()
187                        .map_or(false, |e| Self::contains_unnest(e))
188            }
189            SqlExpression::InList { expr, values } => {
190                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
191            }
192            SqlExpression::NotInList { expr, values } => {
193                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
194            }
195            SqlExpression::Between { expr, lower, upper } => {
196                Self::contains_unnest(expr)
197                    || Self::contains_unnest(lower)
198                    || Self::contains_unnest(upper)
199            }
200            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
201            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
202            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
203            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
204            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
205            SqlExpression::ChainedMethodCall { base, args, .. } => {
206                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
207            }
208            SqlExpression::Unnest { .. } => true, // Direct UNNEST variant
209            _ => false,
210        }
211    }
212
213    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
214    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
215        let (view, _plan) = self.execute_with_plan(table, sql)?;
216        Ok(view)
217    }
218
219    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
220    pub fn execute_statement(
221        &self,
222        table: Arc<DataTable>,
223        statement: SelectStatement,
224    ) -> Result<DataView> {
225        // First process CTEs to build context
226        let mut cte_context = HashMap::new();
227        for cte in &statement.ctes {
228            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
229            // Execute the CTE based on its type
230            let cte_result = match &cte.cte_type {
231                CTEType::Standard(query) => {
232                    // Execute the CTE query (it might reference earlier CTEs)
233                    let view = self.build_view_with_context(
234                        table.clone(),
235                        query.clone(),
236                        &mut cte_context,
237                    )?;
238
239                    // Materialize the view and enrich columns with qualified names
240                    let mut materialized = self.materialize_view(view)?;
241
242                    // Enrich columns with qualified names for proper scoping
243                    for column in materialized.columns_mut() {
244                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
245                        column.source_table = Some(cte.name.clone());
246                    }
247
248                    DataView::new(Arc::new(materialized))
249                }
250                CTEType::Web(web_spec) => {
251                    // Fetch data from URL
252                    use crate::web::http_fetcher::WebDataFetcher;
253
254                    let fetcher = WebDataFetcher::new()?;
255                    // Pass None for query context (no full SQL available in these contexts)
256                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
257
258                    // Enrich columns with qualified names for proper scoping
259                    for column in data_table.columns_mut() {
260                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261                        column.source_table = Some(cte.name.clone());
262                    }
263
264                    // Convert DataTable to DataView
265                    DataView::new(Arc::new(data_table))
266                }
267            };
268            // Store the result in the context for later use
269            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
270            debug!(
271                "QueryEngine: CTE '{}' pre-processed, stored in context",
272                cte.name
273            );
274        }
275
276        // Now process subqueries with CTE context available
277        let mut subquery_executor =
278            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
279        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
280
281        // Build the view with the same CTE context
282        self.build_view_with_context(table, processed_statement, &mut cte_context)
283    }
284
285    /// Execute a statement with provided CTE context (for subqueries)
286    pub fn execute_statement_with_cte_context(
287        &self,
288        table: Arc<DataTable>,
289        statement: SelectStatement,
290        cte_context: &HashMap<String, Arc<DataView>>,
291    ) -> Result<DataView> {
292        // Clone the context so we can add any CTEs from this statement
293        let mut local_context = cte_context.clone();
294
295        // Process any CTEs in this statement (they might be nested)
296        for cte in &statement.ctes {
297            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
298            let cte_result = match &cte.cte_type {
299                CTEType::Standard(query) => {
300                    let view = self.build_view_with_context(
301                        table.clone(),
302                        query.clone(),
303                        &mut local_context,
304                    )?;
305
306                    // Materialize the view and enrich columns with qualified names
307                    let mut materialized = self.materialize_view(view)?;
308
309                    // Enrich columns with qualified names for proper scoping
310                    for column in materialized.columns_mut() {
311                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
312                        column.source_table = Some(cte.name.clone());
313                    }
314
315                    DataView::new(Arc::new(materialized))
316                }
317                CTEType::Web(web_spec) => {
318                    // Fetch data from URL
319                    use crate::web::http_fetcher::WebDataFetcher;
320
321                    let fetcher = WebDataFetcher::new()?;
322                    // Pass None for query context (no full SQL available in these contexts)
323                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
324
325                    // Enrich columns with qualified names for proper scoping
326                    for column in data_table.columns_mut() {
327                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
328                        column.source_table = Some(cte.name.clone());
329                    }
330
331                    // Convert DataTable to DataView
332                    DataView::new(Arc::new(data_table))
333                }
334            };
335            local_context.insert(cte.name.clone(), Arc::new(cte_result));
336        }
337
338        // Process subqueries with the complete context
339        let mut subquery_executor =
340            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
341        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
342
343        // Build the view
344        self.build_view_with_context(table, processed_statement, &mut local_context)
345    }
346
347    /// Execute a query and return both the result and the execution plan
348    pub fn execute_with_plan(
349        &self,
350        table: Arc<DataTable>,
351        sql: &str,
352    ) -> Result<(DataView, ExecutionPlan)> {
353        let mut plan_builder = ExecutionPlanBuilder::new();
354        let start_time = Instant::now();
355
356        // Parse the SQL query
357        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
358        plan_builder.add_detail(format!("Query: {}", sql));
359        let mut parser = Parser::new(sql);
360        let statement = parser
361            .parse()
362            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
363        plan_builder.add_detail(format!("Parsed successfully"));
364        if let Some(from) = &statement.from_table {
365            plan_builder.add_detail(format!("FROM: {}", from));
366        }
367        if statement.where_clause.is_some() {
368            plan_builder.add_detail("WHERE clause present".to_string());
369        }
370        plan_builder.end_step();
371
372        // First process CTEs to build context
373        let mut cte_context = HashMap::new();
374
375        if !statement.ctes.is_empty() {
376            plan_builder.begin_step(
377                StepType::CTE,
378                format!("Process {} CTEs", statement.ctes.len()),
379            );
380
381            for cte in &statement.ctes {
382                let cte_start = Instant::now();
383                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
384
385                let cte_result = match &cte.cte_type {
386                    CTEType::Standard(query) => {
387                        // Add CTE query details
388                        if let Some(from) = &query.from_table {
389                            plan_builder.add_detail(format!("Source: {}", from));
390                        }
391                        if query.where_clause.is_some() {
392                            plan_builder.add_detail("Has WHERE clause".to_string());
393                        }
394                        if query.group_by.is_some() {
395                            plan_builder.add_detail("Has GROUP BY".to_string());
396                        }
397
398                        debug!(
399                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
400                            cte.name,
401                            cte_context.keys().collect::<Vec<_>>()
402                        );
403
404                        // Process subqueries in the CTE's query FIRST
405                        // This allows the subqueries to see all previously defined CTEs
406                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
407                            self.clone(),
408                            table.clone(),
409                            cte_context.clone(),
410                        );
411                        let processed_query = subquery_executor.execute_subqueries(query)?;
412
413                        let view = self.build_view_with_context(
414                            table.clone(),
415                            processed_query,
416                            &mut cte_context,
417                        )?;
418
419                        // Materialize the view and enrich columns with qualified names
420                        let mut materialized = self.materialize_view(view)?;
421
422                        // Enrich columns with qualified names for proper scoping
423                        for column in materialized.columns_mut() {
424                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
425                            column.source_table = Some(cte.name.clone());
426                        }
427
428                        DataView::new(Arc::new(materialized))
429                    }
430                    CTEType::Web(web_spec) => {
431                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
432                        if let Some(format) = &web_spec.format {
433                            plan_builder.add_detail(format!("Format: {:?}", format));
434                        }
435                        if let Some(cache) = web_spec.cache_seconds {
436                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
437                        }
438
439                        // Fetch data from URL
440                        use crate::web::http_fetcher::WebDataFetcher;
441
442                        let fetcher = WebDataFetcher::new()?;
443                        // Pass None for query context - each WEB CTE is independent
444                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
445
446                        // Enrich columns with qualified names for proper scoping
447                        for column in data_table.columns_mut() {
448                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
449                            column.source_table = Some(cte.name.clone());
450                        }
451
452                        // Convert DataTable to DataView
453                        DataView::new(Arc::new(data_table))
454                    }
455                };
456
457                // Record CTE statistics
458                plan_builder.set_rows_out(cte_result.row_count());
459                plan_builder.add_detail(format!(
460                    "Result: {} rows, {} columns",
461                    cte_result.row_count(),
462                    cte_result.column_count()
463                ));
464                plan_builder.add_detail(format!(
465                    "Execution time: {:.3}ms",
466                    cte_start.elapsed().as_secs_f64() * 1000.0
467                ));
468
469                debug!(
470                    "QueryEngine: Storing CTE '{}' in context with {} rows",
471                    cte.name,
472                    cte_result.row_count()
473                );
474                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
475                plan_builder.end_step();
476            }
477
478            plan_builder.add_detail(format!(
479                "All {} CTEs cached in context",
480                statement.ctes.len()
481            ));
482            plan_builder.end_step();
483        }
484
485        // Process subqueries in the statement with CTE context
486        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
487        let mut subquery_executor =
488            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
489
490        // Check if there are subqueries to process
491        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
492            // This is a simplified check - in reality we'd need to walk the AST
493            format!("{:?}", w).contains("Subquery")
494        });
495
496        if has_subqueries {
497            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
498        }
499
500        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
501
502        if has_subqueries {
503            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
504        } else {
505            plan_builder.add_detail("No subqueries to process".to_string());
506        }
507
508        plan_builder.end_step();
509        let result = self.build_view_with_context_and_plan(
510            table,
511            processed_statement,
512            &mut cte_context,
513            &mut plan_builder,
514        )?;
515
516        let total_duration = start_time.elapsed();
517        info!(
518            "Query execution complete: total={:?}, rows={}",
519            total_duration,
520            result.row_count()
521        );
522
523        let plan = plan_builder.build();
524        Ok((result, plan))
525    }
526
527    /// Build a `DataView` from a parsed SQL statement
528    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
529        let mut cte_context = HashMap::new();
530        self.build_view_with_context(table, statement, &mut cte_context)
531    }
532
533    /// Build a DataView from a SelectStatement with CTE context
534    fn build_view_with_context(
535        &self,
536        table: Arc<DataTable>,
537        statement: SelectStatement,
538        cte_context: &mut HashMap<String, Arc<DataView>>,
539    ) -> Result<DataView> {
540        let mut dummy_plan = ExecutionPlanBuilder::new();
541        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
542    }
543
544    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
545    fn build_view_with_context_and_plan(
546        &self,
547        table: Arc<DataTable>,
548        statement: SelectStatement,
549        cte_context: &mut HashMap<String, Arc<DataView>>,
550        plan: &mut ExecutionPlanBuilder,
551    ) -> Result<DataView> {
552        // First, process any CTEs that aren't already in the context
553        for cte in &statement.ctes {
554            // Skip if already processed (e.g., by execute_select for WEB CTEs)
555            if cte_context.contains_key(&cte.name) {
556                debug!(
557                    "QueryEngine: CTE '{}' already in context, skipping",
558                    cte.name
559                );
560                continue;
561            }
562
563            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
564            debug!(
565                "QueryEngine: Available CTEs for '{}': {:?}",
566                cte.name,
567                cte_context.keys().collect::<Vec<_>>()
568            );
569
570            // Execute the CTE query (it might reference earlier CTEs)
571            let cte_result = match &cte.cte_type {
572                CTEType::Standard(query) => {
573                    let view =
574                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
575
576                    // Materialize the view and enrich columns with qualified names
577                    let mut materialized = self.materialize_view(view)?;
578
579                    // Enrich columns with qualified names for proper scoping
580                    for column in materialized.columns_mut() {
581                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
582                        column.source_table = Some(cte.name.clone());
583                    }
584
585                    DataView::new(Arc::new(materialized))
586                }
587                CTEType::Web(_web_spec) => {
588                    // Web CTEs should have been processed earlier in execute_select
589                    return Err(anyhow!(
590                        "Web CTEs should be processed in execute_select method"
591                    ));
592                }
593            };
594
595            // Store the result in the context for later use
596            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
597            debug!(
598                "QueryEngine: CTE '{}' processed, stored in context",
599                cte.name
600            );
601        }
602
603        // Determine the source table for the main query
604        let source_table = if let Some(ref table_func) = statement.from_function {
605            // Handle table functions like RANGE()
606            debug!("QueryEngine: Processing table function...");
607            match table_func {
608                TableFunction::Generator { name, args } => {
609                    // Use the generator registry to create the table
610                    use crate::sql::generators::GeneratorRegistry;
611
612                    // Create generator registry (could be cached in QueryEngine)
613                    let registry = GeneratorRegistry::new();
614
615                    if let Some(generator) = registry.get(name) {
616                        // Evaluate arguments
617                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
618                            &table,
619                            self.date_notation.clone(),
620                        );
621                        let dummy_row = 0;
622
623                        let mut evaluated_args = Vec::new();
624                        for arg in args {
625                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
626                        }
627
628                        // Generate the table
629                        generator.generate(evaluated_args)?
630                    } else {
631                        return Err(anyhow!("Unknown generator function: {}", name));
632                    }
633                }
634            }
635        } else if let Some(ref subquery) = statement.from_subquery {
636            // Execute the subquery and use its result as the source
637            debug!("QueryEngine: Processing FROM subquery...");
638            let subquery_result =
639                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
640
641            // Convert the DataView to a DataTable for use as source
642            // This materializes the subquery result
643            let materialized = self.materialize_view(subquery_result)?;
644            Arc::new(materialized)
645        } else if let Some(ref table_name) = statement.from_table {
646            // Check if this references a CTE
647            if let Some(cte_view) = cte_context.get(table_name) {
648                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
649                // Materialize the CTE view as a table
650                let mut materialized = self.materialize_view((**cte_view).clone())?;
651
652                // Apply alias to qualified column names if present
653                if let Some(ref alias) = statement.from_alias {
654                    debug!(
655                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
656                        alias, table_name
657                    );
658                    for column in materialized.columns_mut() {
659                        // Replace the CTE name with the alias in qualified names
660                        if let Some(ref qualified_name) = column.qualified_name {
661                            if qualified_name.starts_with(&format!("{}.", table_name)) {
662                                column.qualified_name =
663                                    Some(qualified_name.replace(
664                                        &format!("{}.", table_name),
665                                        &format!("{}.", alias),
666                                    ));
667                            }
668                        }
669                        // Update source table to reflect the alias
670                        if column.source_table.as_ref() == Some(table_name) {
671                            column.source_table = Some(alias.clone());
672                        }
673                    }
674                }
675
676                Arc::new(materialized)
677            } else {
678                // Regular table reference - use the provided table
679                table.clone()
680            }
681        } else {
682            // No FROM clause - use the provided table
683            table.clone()
684        };
685
686        // Process JOINs if present
687        let final_table = if !statement.joins.is_empty() {
688            plan.begin_step(
689                StepType::Join,
690                format!("Process {} JOINs", statement.joins.len()),
691            );
692            plan.set_rows_in(source_table.row_count());
693
694            let join_executor = HashJoinExecutor::new(self.case_insensitive);
695            let mut current_table = source_table;
696
697            for (idx, join_clause) in statement.joins.iter().enumerate() {
698                let join_start = Instant::now();
699                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
700                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
701                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
702                plan.add_detail(format!(
703                    "Executing {:?} JOIN on {} condition(s)",
704                    join_clause.join_type,
705                    join_clause.condition.conditions.len()
706                ));
707
708                // Resolve the right table for the join
709                let right_table = match &join_clause.table {
710                    TableSource::Table(name) => {
711                        // Check if it's a CTE reference
712                        if let Some(cte_view) = cte_context.get(name) {
713                            let mut materialized = self.materialize_view((**cte_view).clone())?;
714
715                            // Apply alias to qualified column names if present
716                            if let Some(ref alias) = join_clause.alias {
717                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
718                                for column in materialized.columns_mut() {
719                                    // Replace the CTE name with the alias in qualified names
720                                    if let Some(ref qualified_name) = column.qualified_name {
721                                        if qualified_name.starts_with(&format!("{}.", name)) {
722                                            column.qualified_name = Some(qualified_name.replace(
723                                                &format!("{}.", name),
724                                                &format!("{}.", alias),
725                                            ));
726                                        }
727                                    }
728                                    // Update source table to reflect the alias
729                                    if column.source_table.as_ref() == Some(name) {
730                                        column.source_table = Some(alias.clone());
731                                    }
732                                }
733                            }
734
735                            Arc::new(materialized)
736                        } else {
737                            // For now, we need the actual table data
738                            // In a real implementation, this would load from file
739                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
740                        }
741                    }
742                    TableSource::DerivedTable { query, alias: _ } => {
743                        // Execute the subquery
744                        let subquery_result = self.build_view_with_context(
745                            table.clone(),
746                            *query.clone(),
747                            cte_context,
748                        )?;
749                        let materialized = self.materialize_view(subquery_result)?;
750                        Arc::new(materialized)
751                    }
752                };
753
754                // Execute the join
755                let joined = join_executor.execute_join(
756                    current_table.clone(),
757                    join_clause,
758                    right_table.clone(),
759                )?;
760
761                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
762                plan.set_rows_out(joined.row_count());
763                plan.add_detail(format!("Result: {} rows", joined.row_count()));
764                plan.add_detail(format!(
765                    "Join time: {:.3}ms",
766                    join_start.elapsed().as_secs_f64() * 1000.0
767                ));
768                plan.end_step();
769
770                current_table = Arc::new(joined);
771            }
772
773            plan.set_rows_out(current_table.row_count());
774            plan.add_detail(format!(
775                "Final result after all joins: {} rows",
776                current_table.row_count()
777            ));
778            plan.end_step();
779            current_table
780        } else {
781            source_table
782        };
783
784        // Continue with the existing build_view logic but using final_table
785        self.build_view_internal_with_plan(final_table, statement, plan)
786    }
787
788    /// Materialize a DataView into a new DataTable
789    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
790        let source = view.source();
791        let mut result_table = DataTable::new("derived");
792
793        // Get the visible columns from the view
794        let visible_cols = view.visible_column_indices().to_vec();
795
796        // Copy column definitions
797        for col_idx in &visible_cols {
798            let col = &source.columns[*col_idx];
799            let new_col = DataColumn {
800                name: col.name.clone(),
801                data_type: col.data_type.clone(),
802                nullable: col.nullable,
803                unique_values: col.unique_values,
804                null_count: col.null_count,
805                metadata: col.metadata.clone(),
806                qualified_name: col.qualified_name.clone(), // Preserve qualified name
807                source_table: col.source_table.clone(),     // Preserve source table
808            };
809            result_table.add_column(new_col);
810        }
811
812        // Copy visible rows
813        for row_idx in view.visible_row_indices() {
814            let source_row = &source.rows[*row_idx];
815            let mut new_row = DataRow { values: Vec::new() };
816
817            for col_idx in &visible_cols {
818                new_row.values.push(source_row.values[*col_idx].clone());
819            }
820
821            result_table.add_row(new_row);
822        }
823
824        Ok(result_table)
825    }
826
827    fn build_view_internal(
828        &self,
829        table: Arc<DataTable>,
830        statement: SelectStatement,
831    ) -> Result<DataView> {
832        let mut dummy_plan = ExecutionPlanBuilder::new();
833        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
834    }
835
836    fn build_view_internal_with_plan(
837        &self,
838        table: Arc<DataTable>,
839        statement: SelectStatement,
840        plan: &mut ExecutionPlanBuilder,
841    ) -> Result<DataView> {
842        debug!(
843            "QueryEngine::build_view - select_items: {:?}",
844            statement.select_items
845        );
846        debug!(
847            "QueryEngine::build_view - where_clause: {:?}",
848            statement.where_clause
849        );
850
851        // Start with all rows visible
852        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
853
854        // Apply WHERE clause filtering using recursive evaluator
855        if let Some(where_clause) = &statement.where_clause {
856            let total_rows = table.row_count();
857            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
858            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
859
860            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
861            plan.set_rows_in(total_rows);
862            plan.add_detail(format!("Input: {} rows", total_rows));
863
864            // Add details about WHERE conditions
865            for condition in &where_clause.conditions {
866                plan.add_detail(format!("Condition: {:?}", condition.expr));
867            }
868
869            let filter_start = Instant::now();
870            // Create an evaluation context for caching compiled regexes
871            let mut eval_context = EvaluationContext::new(self.case_insensitive);
872
873            // Filter visible rows based on WHERE clause
874            let mut filtered_rows = Vec::new();
875            for row_idx in visible_rows {
876                // Only log for first few rows to avoid performance impact
877                if row_idx < 3 {
878                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
879                }
880                let mut evaluator =
881                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
882                match evaluator.evaluate(where_clause, row_idx) {
883                    Ok(result) => {
884                        if row_idx < 3 {
885                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
886                        }
887                        if result {
888                            filtered_rows.push(row_idx);
889                        }
890                    }
891                    Err(e) => {
892                        if row_idx < 3 {
893                            debug!(
894                                "QueryEngine: WHERE evaluation error for row {}: {}",
895                                row_idx, e
896                            );
897                        }
898                        // Propagate WHERE clause errors instead of silently ignoring them
899                        return Err(e);
900                    }
901                }
902            }
903
904            // Log regex cache statistics
905            let (compilations, cache_hits) = eval_context.get_stats();
906            if compilations > 0 || cache_hits > 0 {
907                debug!(
908                    "LIKE pattern cache: {} compilations, {} cache hits",
909                    compilations, cache_hits
910                );
911            }
912            visible_rows = filtered_rows;
913            let filter_duration = filter_start.elapsed();
914            info!(
915                "WHERE clause filtering: {} rows -> {} rows in {:?}",
916                total_rows,
917                visible_rows.len(),
918                filter_duration
919            );
920
921            plan.set_rows_out(visible_rows.len());
922            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
923            plan.add_detail(format!(
924                "Filter time: {:.3}ms",
925                filter_duration.as_secs_f64() * 1000.0
926            ));
927            plan.end_step();
928        }
929
930        // Create initial DataView with filtered rows
931        let mut view = DataView::new(table.clone());
932        view = view.with_rows(visible_rows);
933
934        // Handle GROUP BY if present
935        if let Some(group_by_exprs) = &statement.group_by {
936            if !group_by_exprs.is_empty() {
937                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
938
939                plan.begin_step(
940                    StepType::GroupBy,
941                    format!("GROUP BY {} expressions", group_by_exprs.len()),
942                );
943                plan.set_rows_in(view.row_count());
944                plan.add_detail(format!("Input: {} rows", view.row_count()));
945                for expr in group_by_exprs {
946                    plan.add_detail(format!("Group by: {:?}", expr));
947                }
948
949                let group_start = Instant::now();
950                view = self.apply_group_by(
951                    view,
952                    group_by_exprs,
953                    &statement.select_items,
954                    statement.having.as_ref(),
955                    plan,
956                )?;
957
958                plan.set_rows_out(view.row_count());
959                plan.add_detail(format!("Output: {} groups", view.row_count()));
960                plan.add_detail(format!(
961                    "Overall time: {:.3}ms",
962                    group_start.elapsed().as_secs_f64() * 1000.0
963                ));
964                plan.end_step();
965            }
966        } else {
967            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
968            if !statement.select_items.is_empty() {
969                // Check if we have ANY non-star items (not just the first one)
970                let has_non_star_items = statement
971                    .select_items
972                    .iter()
973                    .any(|item| !matches!(item, SelectItem::Star));
974
975                // Apply select items if:
976                // 1. We have computed expressions or explicit columns
977                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
978                if has_non_star_items || statement.select_items.len() > 1 {
979                    view = self.apply_select_items(view, &statement.select_items)?;
980                } else {
981                }
982                // If it's just a single star, no projection needed
983            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
984                debug!("QueryEngine: Using legacy columns path");
985                // Fallback to legacy column projection for backward compatibility
986                // Use the current view's source table, not the original table
987                let source_table = view.source();
988                let column_indices =
989                    self.resolve_column_indices(source_table, &statement.columns)?;
990                view = view.with_columns(column_indices);
991            }
992        }
993
994        // Apply DISTINCT if specified
995        if statement.distinct {
996            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
997            plan.set_rows_in(view.row_count());
998            plan.add_detail(format!("Input: {} rows", view.row_count()));
999
1000            let distinct_start = Instant::now();
1001            view = self.apply_distinct(view)?;
1002
1003            plan.set_rows_out(view.row_count());
1004            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1005            plan.add_detail(format!(
1006                "Distinct time: {:.3}ms",
1007                distinct_start.elapsed().as_secs_f64() * 1000.0
1008            ));
1009            plan.end_step();
1010        }
1011
1012        // Apply ORDER BY sorting
1013        if let Some(order_by_columns) = &statement.order_by {
1014            if !order_by_columns.is_empty() {
1015                plan.begin_step(
1016                    StepType::Sort,
1017                    format!("ORDER BY {} columns", order_by_columns.len()),
1018                );
1019                plan.set_rows_in(view.row_count());
1020                for col in order_by_columns {
1021                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
1022                }
1023
1024                let sort_start = Instant::now();
1025                view = self.apply_multi_order_by(view, order_by_columns)?;
1026
1027                plan.add_detail(format!(
1028                    "Sort time: {:.3}ms",
1029                    sort_start.elapsed().as_secs_f64() * 1000.0
1030                ));
1031                plan.end_step();
1032            }
1033        }
1034
1035        // Apply LIMIT/OFFSET
1036        if let Some(limit) = statement.limit {
1037            let offset = statement.offset.unwrap_or(0);
1038            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1039            plan.set_rows_in(view.row_count());
1040            if offset > 0 {
1041                plan.add_detail(format!("OFFSET: {}", offset));
1042            }
1043            view = view.with_limit(limit, offset);
1044            plan.set_rows_out(view.row_count());
1045            plan.add_detail(format!("Output: {} rows", view.row_count()));
1046            plan.end_step();
1047        }
1048
1049        Ok(view)
1050    }
1051
1052    /// Resolve column names to indices
1053    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1054        let mut indices = Vec::new();
1055        let table_columns = table.column_names();
1056
1057        for col_name in columns {
1058            let index = table_columns
1059                .iter()
1060                .position(|c| c.eq_ignore_ascii_case(col_name))
1061                .ok_or_else(|| {
1062                    let suggestion = self.find_similar_column(table, col_name);
1063                    match suggestion {
1064                        Some(similar) => anyhow::anyhow!(
1065                            "Column '{}' not found. Did you mean '{}'?",
1066                            col_name,
1067                            similar
1068                        ),
1069                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1070                    }
1071                })?;
1072            indices.push(index);
1073        }
1074
1075        Ok(indices)
1076    }
1077
1078    /// Apply SELECT items (columns and computed expressions) to create new view
1079    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1080        debug!(
1081            "QueryEngine::apply_select_items - items: {:?}",
1082            select_items
1083        );
1084        debug!(
1085            "QueryEngine::apply_select_items - input view has {} rows",
1086            view.row_count()
1087        );
1088
1089        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
1090        let has_unnest = select_items.iter().any(|item| match item {
1091            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1092            _ => false,
1093        });
1094
1095        if has_unnest {
1096            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1097            return self.apply_select_with_row_expansion(view, select_items);
1098        }
1099
1100        // Check if this is an aggregate query:
1101        // 1. At least one aggregate function exists
1102        // 2. All other items are either aggregates or constants (aggregate-compatible)
1103        let has_aggregates = select_items.iter().any(|item| match item {
1104            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1105            SelectItem::Column(_) => false,
1106            SelectItem::Star => false,
1107        });
1108
1109        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1110            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1111            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
1112            SelectItem::Star => false,      // Star is not aggregate-compatible
1113        });
1114
1115        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1116            // Special handling for aggregate queries with constants (no GROUP BY)
1117            // These should produce exactly one row
1118            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1119            return self.apply_aggregate_select(view, select_items);
1120        }
1121
1122        // Check if we need to create computed columns
1123        let has_computed_expressions = select_items
1124            .iter()
1125            .any(|item| matches!(item, SelectItem::Expression { .. }));
1126
1127        debug!(
1128            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1129            has_computed_expressions
1130        );
1131
1132        if !has_computed_expressions {
1133            // Simple case: only columns, use existing projection logic
1134            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1135            return Ok(view.with_columns(column_indices));
1136        }
1137
1138        // Complex case: we have computed expressions
1139        // IMPORTANT: We create a PROJECTED view, not a new table
1140        // This preserves the original DataTable reference
1141
1142        let source_table = view.source();
1143        let visible_rows = view.visible_row_indices();
1144
1145        // Create a temporary table just for the computed result view
1146        // But this table is only used for the current query result
1147        let mut computed_table = DataTable::new("query_result");
1148
1149        // First, expand any Star selectors to actual columns
1150        let mut expanded_items = Vec::new();
1151        for item in select_items {
1152            match item {
1153                SelectItem::Star => {
1154                    // Expand * to all columns from source table
1155                    for col_name in source_table.column_names() {
1156                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1157                            col_name.to_string(),
1158                        )));
1159                    }
1160                }
1161                _ => expanded_items.push(item.clone()),
1162            }
1163        }
1164
1165        // Add columns based on expanded SelectItems, handling duplicates
1166        let mut column_name_counts: std::collections::HashMap<String, usize> =
1167            std::collections::HashMap::new();
1168
1169        for item in &expanded_items {
1170            let base_name = match item {
1171                SelectItem::Column(col_ref) => col_ref.name.clone(),
1172                SelectItem::Expression { alias, .. } => alias.clone(),
1173                SelectItem::Star => unreachable!("Star should have been expanded"),
1174            };
1175
1176            // Check if this column name has been used before
1177            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1178            let column_name = if *count == 0 {
1179                // First occurrence, use the name as-is
1180                base_name.clone()
1181            } else {
1182                // Duplicate, append a suffix
1183                format!("{base_name}_{count}")
1184            };
1185            *count += 1;
1186
1187            computed_table.add_column(DataColumn::new(&column_name));
1188        }
1189
1190        // Calculate values for each row
1191        let mut evaluator =
1192            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1193
1194        for &row_idx in visible_rows {
1195            let mut row_values = Vec::new();
1196
1197            for item in &expanded_items {
1198                let value = match item {
1199                    SelectItem::Column(col_ref) => {
1200                        // Column reference with optional table prefix
1201                        let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1202                            // For qualified references, ONLY try qualified lookup - no fallback
1203                            let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1204                            let result = source_table.find_column_by_qualified_name(&qualified_name);
1205
1206                            // Debug: log what qualified names are available when lookup fails
1207                            if result.is_none() {
1208                                let available_qualified: Vec<String> = source_table.columns.iter()
1209                                    .filter_map(|c| c.qualified_name.clone())
1210                                    .collect();
1211                                let available_simple: Vec<String> = source_table.columns.iter()
1212                                    .map(|c| c.name.clone())
1213                                    .collect();
1214                                debug!(
1215                                    "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1216                                    qualified_name, available_qualified, available_simple
1217                                );
1218                                // This MUST return None to trigger error
1219                            }
1220                            result
1221                        } else {
1222                            // Simple column name lookup
1223                            source_table.get_column_index(&col_ref.name)
1224                        }
1225                        .ok_or_else(|| {
1226                            let display_name = if let Some(prefix) = &col_ref.table_prefix {
1227                                format!("{}.{}", prefix, col_ref.name)
1228                            } else {
1229                                col_ref.name.clone()
1230                            };
1231                            let suggestion = self.find_similar_column(source_table, &col_ref.name);
1232                            match suggestion {
1233                                Some(similar) => anyhow::anyhow!(
1234                                    "Column '{}' not found. Did you mean '{}'?",
1235                                    display_name,
1236                                    similar
1237                                ),
1238                                None => anyhow::anyhow!("Column '{}' not found", display_name),
1239                            }
1240                        })?;
1241                        let row = source_table
1242                            .get_row(row_idx)
1243                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1244                        row.get(col_idx)
1245                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1246                            .clone()
1247                    }
1248                    SelectItem::Expression { expr, .. } => {
1249                        // Computed expression
1250                        evaluator.evaluate(expr, row_idx)?
1251                    }
1252                    SelectItem::Star => unreachable!("Star should have been expanded"),
1253                };
1254                row_values.push(value);
1255            }
1256
1257            computed_table
1258                .add_row(DataRow::new(row_values))
1259                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1260        }
1261
1262        // Return a view of the computed result
1263        // This is a temporary view for this query only
1264        Ok(DataView::new(Arc::new(computed_table)))
1265    }
1266
1267    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
1268    fn apply_select_with_row_expansion(
1269        &self,
1270        view: DataView,
1271        select_items: &[SelectItem],
1272    ) -> Result<DataView> {
1273        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1274
1275        let source_table = view.source();
1276        let visible_rows = view.visible_row_indices();
1277        let expander_registry = RowExpanderRegistry::new();
1278
1279        // Create result table
1280        let mut result_table = DataTable::new("unnest_result");
1281
1282        // Expand * to columns and set up result columns
1283        let mut expanded_items = Vec::new();
1284        for item in select_items {
1285            match item {
1286                SelectItem::Star => {
1287                    for col_name in source_table.column_names() {
1288                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1289                            col_name.to_string(),
1290                        )));
1291                    }
1292                }
1293                _ => expanded_items.push(item.clone()),
1294            }
1295        }
1296
1297        // Add columns to result table
1298        for item in &expanded_items {
1299            let column_name = match item {
1300                SelectItem::Column(col_ref) => col_ref.name.clone(),
1301                SelectItem::Expression { alias, .. } => alias.clone(),
1302                SelectItem::Star => unreachable!("Star should have been expanded"),
1303            };
1304            result_table.add_column(DataColumn::new(&column_name));
1305        }
1306
1307        // Process each input row
1308        let mut evaluator =
1309            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1310
1311        for &row_idx in visible_rows {
1312            // First pass: identify UNNEST expressions and collect their expansion arrays
1313            let mut unnest_expansions = Vec::new();
1314            let mut unnest_indices = Vec::new();
1315
1316            for (col_idx, item) in expanded_items.iter().enumerate() {
1317                if let SelectItem::Expression { expr, .. } = item {
1318                    if let Some(expansion_result) = self.try_expand_unnest(
1319                        expr,
1320                        source_table,
1321                        row_idx,
1322                        &mut evaluator,
1323                        &expander_registry,
1324                    )? {
1325                        unnest_expansions.push(expansion_result);
1326                        unnest_indices.push(col_idx);
1327                    }
1328                }
1329            }
1330
1331            // Determine how many output rows to generate
1332            let expansion_count = if unnest_expansions.is_empty() {
1333                1 // No UNNEST, just one row
1334            } else {
1335                unnest_expansions
1336                    .iter()
1337                    .map(|exp| exp.row_count())
1338                    .max()
1339                    .unwrap_or(1)
1340            };
1341
1342            // Generate output rows
1343            for output_idx in 0..expansion_count {
1344                let mut row_values = Vec::new();
1345
1346                for (col_idx, item) in expanded_items.iter().enumerate() {
1347                    // Check if this column is an UNNEST column
1348                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1349
1350                    let value = if let Some(unnest_idx) = unnest_position {
1351                        // Get value from expansion array (or NULL if exhausted)
1352                        let expansion = &unnest_expansions[unnest_idx];
1353                        expansion
1354                            .values
1355                            .get(output_idx)
1356                            .cloned()
1357                            .unwrap_or(DataValue::Null)
1358                    } else {
1359                        // Regular column or non-UNNEST expression - replicate from input
1360                        match item {
1361                            SelectItem::Column(col_ref) => {
1362                                let col_idx =
1363                                    source_table.get_column_index(&col_ref.name).ok_or_else(
1364                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1365                                    )?;
1366                                let row = source_table
1367                                    .get_row(row_idx)
1368                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1369                                row.get(col_idx)
1370                                    .ok_or_else(|| {
1371                                        anyhow::anyhow!("Column {} not found in row", col_idx)
1372                                    })?
1373                                    .clone()
1374                            }
1375                            SelectItem::Expression { expr, .. } => {
1376                                // Non-UNNEST expression - evaluate once and replicate
1377                                evaluator.evaluate(expr, row_idx)?
1378                            }
1379                            SelectItem::Star => unreachable!(),
1380                        }
1381                    };
1382
1383                    row_values.push(value);
1384                }
1385
1386                result_table
1387                    .add_row(DataRow::new(row_values))
1388                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1389            }
1390        }
1391
1392        debug!(
1393            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1394            visible_rows.len(),
1395            result_table.row_count()
1396        );
1397
1398        Ok(DataView::new(Arc::new(result_table)))
1399    }
1400
1401    /// Try to expand an expression if it's an UNNEST call
1402    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
1403    fn try_expand_unnest(
1404        &self,
1405        expr: &SqlExpression,
1406        _source_table: &DataTable,
1407        row_idx: usize,
1408        evaluator: &mut ArithmeticEvaluator,
1409        expander_registry: &RowExpanderRegistry,
1410    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1411        // Check for UNNEST variant (direct syntax)
1412        if let SqlExpression::Unnest { column, delimiter } = expr {
1413            // Evaluate the column expression
1414            let column_value = evaluator.evaluate(column, row_idx)?;
1415
1416            // Delimiter is already a string literal
1417            let delimiter_value = DataValue::String(delimiter.clone());
1418
1419            // Get the UNNEST expander
1420            let expander = expander_registry
1421                .get("UNNEST")
1422                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1423
1424            // Expand the value
1425            let expansion = expander.expand(&column_value, &[delimiter_value])?;
1426            return Ok(Some(expansion));
1427        }
1428
1429        // Also check for FunctionCall form (for compatibility)
1430        if let SqlExpression::FunctionCall { name, args, .. } = expr {
1431            if name.to_uppercase() == "UNNEST" {
1432                // UNNEST(column, delimiter)
1433                if args.len() != 2 {
1434                    return Err(anyhow::anyhow!(
1435                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1436                    ));
1437                }
1438
1439                // Evaluate the column expression (first arg)
1440                let column_value = evaluator.evaluate(&args[0], row_idx)?;
1441
1442                // Evaluate the delimiter expression (second arg)
1443                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1444
1445                // Get the UNNEST expander
1446                let expander = expander_registry
1447                    .get("UNNEST")
1448                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1449
1450                // Expand the value
1451                let expansion = expander.expand(&column_value, &[delimiter_value])?;
1452                return Ok(Some(expansion));
1453            }
1454        }
1455
1456        Ok(None)
1457    }
1458
1459    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1460    fn apply_aggregate_select(
1461        &self,
1462        view: DataView,
1463        select_items: &[SelectItem],
1464    ) -> Result<DataView> {
1465        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1466
1467        let source_table = view.source();
1468        let mut result_table = DataTable::new("aggregate_result");
1469
1470        // Add columns for each select item
1471        for item in select_items {
1472            let column_name = match item {
1473                SelectItem::Expression { alias, .. } => alias.clone(),
1474                _ => unreachable!("Should only have expressions in aggregate-only query"),
1475            };
1476            result_table.add_column(DataColumn::new(&column_name));
1477        }
1478
1479        // Create evaluator with visible rows from the view (for filtered aggregates)
1480        let visible_rows = view.visible_row_indices().to_vec();
1481        let mut evaluator =
1482            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1483                .with_visible_rows(visible_rows);
1484
1485        // Evaluate each aggregate expression once (they handle all rows internally)
1486        let mut row_values = Vec::new();
1487        for item in select_items {
1488            match item {
1489                SelectItem::Expression { expr, .. } => {
1490                    // The evaluator will handle aggregates over all rows
1491                    // We pass row_index=0 but aggregates ignore it and process all rows
1492                    let value = evaluator.evaluate(expr, 0)?;
1493                    row_values.push(value);
1494                }
1495                _ => unreachable!("Should only have expressions in aggregate-only query"),
1496            }
1497        }
1498
1499        // Add the single result row
1500        result_table
1501            .add_row(DataRow::new(row_values))
1502            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1503
1504        Ok(DataView::new(Arc::new(result_table)))
1505    }
1506
1507    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1508    fn resolve_select_columns(
1509        &self,
1510        table: &DataTable,
1511        select_items: &[SelectItem],
1512    ) -> Result<Vec<usize>> {
1513        let mut indices = Vec::new();
1514        let table_columns = table.column_names();
1515
1516        for item in select_items {
1517            match item {
1518                SelectItem::Column(col_ref) => {
1519                    // Check if this has a table prefix
1520                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1521                        // For qualified references, ONLY try qualified lookup - no fallback
1522                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1523                        table.find_column_by_qualified_name(&qualified_name)
1524                            .ok_or_else(|| {
1525                                // Check if any columns have qualified names for better error message
1526                                let has_qualified = table.columns.iter()
1527                                    .any(|c| c.qualified_name.is_some());
1528                                if !has_qualified {
1529                                    anyhow::anyhow!(
1530                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1531                                        qualified_name, table_prefix
1532                                    )
1533                                } else {
1534                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1535                                }
1536                            })?
1537                    } else {
1538                        // Simple column name lookup
1539                        table_columns
1540                            .iter()
1541                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1542                            .ok_or_else(|| {
1543                                let suggestion = self.find_similar_column(table, &col_ref.name);
1544                                match suggestion {
1545                                    Some(similar) => anyhow::anyhow!(
1546                                        "Column '{}' not found. Did you mean '{}'?",
1547                                        col_ref.name,
1548                                        similar
1549                                    ),
1550                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1551                                }
1552                            })?
1553                    };
1554                    indices.push(index);
1555                }
1556                SelectItem::Star => {
1557                    // Expand * to all column indices
1558                    for i in 0..table_columns.len() {
1559                        indices.push(i);
1560                    }
1561                }
1562                SelectItem::Expression { .. } => {
1563                    return Err(anyhow::anyhow!(
1564                        "Computed expressions require new table creation"
1565                    ));
1566                }
1567            }
1568        }
1569
1570        Ok(indices)
1571    }
1572
1573    /// Apply DISTINCT to remove duplicate rows
1574    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1575        use std::collections::HashSet;
1576
1577        let source = view.source();
1578        let visible_cols = view.visible_column_indices();
1579        let visible_rows = view.visible_row_indices();
1580
1581        // Build a set to track unique rows
1582        let mut seen_rows = HashSet::new();
1583        let mut unique_row_indices = Vec::new();
1584
1585        for &row_idx in visible_rows {
1586            // Build a key representing this row's visible column values
1587            let mut row_key = Vec::new();
1588            for &col_idx in visible_cols {
1589                let value = source
1590                    .get_value(row_idx, col_idx)
1591                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1592                // Convert value to a hashable representation
1593                row_key.push(format!("{:?}", value));
1594            }
1595
1596            // Check if we've seen this row before
1597            if seen_rows.insert(row_key) {
1598                // First time seeing this row combination
1599                unique_row_indices.push(row_idx);
1600            }
1601        }
1602
1603        // Create a new view with only unique rows
1604        Ok(view.with_rows(unique_row_indices))
1605    }
1606
1607    /// Apply multi-column ORDER BY sorting to the view
1608    fn apply_multi_order_by(
1609        &self,
1610        mut view: DataView,
1611        order_by_columns: &[OrderByColumn],
1612    ) -> Result<DataView> {
1613        // Build list of (source_column_index, ascending) tuples
1614        let mut sort_columns = Vec::new();
1615
1616        for order_col in order_by_columns {
1617            // Try to find the column index
1618            // Check in the current view's source table (this handles both regular columns and computed columns)
1619            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1620            let col_index = view
1621                .source()
1622                .get_column_index(&order_col.column)
1623                .ok_or_else(|| {
1624                    // If not found, provide helpful error with suggestions
1625                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1626                    match suggestion {
1627                        Some(similar) => anyhow::anyhow!(
1628                            "Column '{}' not found. Did you mean '{}'?",
1629                            order_col.column,
1630                            similar
1631                        ),
1632                        None => {
1633                            // Also list available columns for debugging
1634                            let available_cols = view.source().column_names().join(", ");
1635                            anyhow::anyhow!(
1636                                "Column '{}' not found. Available columns: {}",
1637                                order_col.column,
1638                                available_cols
1639                            )
1640                        }
1641                    }
1642                })?;
1643
1644            let ascending = matches!(order_col.direction, SortDirection::Asc);
1645            sort_columns.push((col_index, ascending));
1646        }
1647
1648        // Apply multi-column sorting
1649        view.apply_multi_sort(&sort_columns)?;
1650        Ok(view)
1651    }
1652
1653    /// Apply GROUP BY to the view with optional HAVING clause
1654    fn apply_group_by(
1655        &self,
1656        view: DataView,
1657        group_by_exprs: &[SqlExpression],
1658        select_items: &[SelectItem],
1659        having: Option<&SqlExpression>,
1660        plan: &mut ExecutionPlanBuilder,
1661    ) -> Result<DataView> {
1662        // Use the new expression-based GROUP BY implementation
1663        let (result_view, phase_info) = self.apply_group_by_expressions(
1664            view,
1665            group_by_exprs,
1666            select_items,
1667            having,
1668            self.case_insensitive,
1669            self.date_notation.clone(),
1670        )?;
1671
1672        // Add detailed phase information to the execution plan
1673        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1674        plan.add_detail(format!(
1675            "Phase 1 - Group Building: {:.3}ms",
1676            phase_info.phase2_key_building.as_secs_f64() * 1000.0
1677        ));
1678        plan.add_detail(format!(
1679            "  • Processing {} rows into {} groups",
1680            phase_info.total_rows, phase_info.num_groups
1681        ));
1682        plan.add_detail(format!(
1683            "Phase 2 - Aggregation: {:.3}ms",
1684            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
1685        ));
1686        if phase_info.phase4_having_evaluation > Duration::ZERO {
1687            plan.add_detail(format!(
1688                "Phase 3 - HAVING Filter: {:.3}ms",
1689                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
1690            ));
1691            plan.add_detail(format!(
1692                "  • Filtered {} groups",
1693                phase_info.groups_filtered_by_having
1694            ));
1695        }
1696        plan.add_detail(format!(
1697            "Total GROUP BY time: {:.3}ms",
1698            phase_info.total_time.as_secs_f64() * 1000.0
1699        ));
1700
1701        Ok(result_view)
1702    }
1703
1704    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1705    /// This helps pre-size hash tables for better performance
1706    pub fn estimate_group_cardinality(
1707        &self,
1708        view: &DataView,
1709        group_by_exprs: &[SqlExpression],
1710    ) -> usize {
1711        // If we have few rows, just return the row count as upper bound
1712        let row_count = view.get_visible_rows().len();
1713        if row_count <= 100 {
1714            return row_count;
1715        }
1716
1717        // Sample first 1000 rows or 10% of data, whichever is smaller
1718        let sample_size = min(1000, row_count / 10).max(100);
1719        let mut seen = FxHashSet::default();
1720
1721        let visible_rows = view.get_visible_rows();
1722        for (i, &row_idx) in visible_rows.iter().enumerate() {
1723            if i >= sample_size {
1724                break;
1725            }
1726
1727            // Evaluate GROUP BY expressions for this row
1728            let mut key_values = Vec::new();
1729            for expr in group_by_exprs {
1730                let mut evaluator = ArithmeticEvaluator::new(view.source());
1731                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1732                key_values.push(value);
1733            }
1734
1735            seen.insert(key_values);
1736        }
1737
1738        // Estimate total cardinality based on sample
1739        let sample_cardinality = seen.len();
1740        let estimated = (sample_cardinality * row_count) / sample_size;
1741
1742        // Cap at row count and ensure minimum of sample cardinality
1743        estimated.min(row_count).max(sample_cardinality)
1744    }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749    use super::*;
1750    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1751
1752    fn create_test_table() -> Arc<DataTable> {
1753        let mut table = DataTable::new("test");
1754
1755        // Add columns
1756        table.add_column(DataColumn::new("id"));
1757        table.add_column(DataColumn::new("name"));
1758        table.add_column(DataColumn::new("age"));
1759
1760        // Add rows
1761        table
1762            .add_row(DataRow::new(vec![
1763                DataValue::Integer(1),
1764                DataValue::String("Alice".to_string()),
1765                DataValue::Integer(30),
1766            ]))
1767            .unwrap();
1768
1769        table
1770            .add_row(DataRow::new(vec![
1771                DataValue::Integer(2),
1772                DataValue::String("Bob".to_string()),
1773                DataValue::Integer(25),
1774            ]))
1775            .unwrap();
1776
1777        table
1778            .add_row(DataRow::new(vec![
1779                DataValue::Integer(3),
1780                DataValue::String("Charlie".to_string()),
1781                DataValue::Integer(35),
1782            ]))
1783            .unwrap();
1784
1785        Arc::new(table)
1786    }
1787
1788    #[test]
1789    fn test_select_all() {
1790        let table = create_test_table();
1791        let engine = QueryEngine::new();
1792
1793        let view = engine
1794            .execute(table.clone(), "SELECT * FROM users")
1795            .unwrap();
1796        assert_eq!(view.row_count(), 3);
1797        assert_eq!(view.column_count(), 3);
1798    }
1799
1800    #[test]
1801    fn test_select_columns() {
1802        let table = create_test_table();
1803        let engine = QueryEngine::new();
1804
1805        let view = engine
1806            .execute(table.clone(), "SELECT name, age FROM users")
1807            .unwrap();
1808        assert_eq!(view.row_count(), 3);
1809        assert_eq!(view.column_count(), 2);
1810    }
1811
1812    #[test]
1813    fn test_select_with_limit() {
1814        let table = create_test_table();
1815        let engine = QueryEngine::new();
1816
1817        let view = engine
1818            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1819            .unwrap();
1820        assert_eq!(view.row_count(), 2);
1821    }
1822
1823    #[test]
1824    fn test_type_coercion_contains() {
1825        // Initialize tracing for debug output
1826        let _ = tracing_subscriber::fmt()
1827            .with_max_level(tracing::Level::DEBUG)
1828            .try_init();
1829
1830        let mut table = DataTable::new("test");
1831        table.add_column(DataColumn::new("id"));
1832        table.add_column(DataColumn::new("status"));
1833        table.add_column(DataColumn::new("price"));
1834
1835        // Add test data with mixed types
1836        table
1837            .add_row(DataRow::new(vec![
1838                DataValue::Integer(1),
1839                DataValue::String("Pending".to_string()),
1840                DataValue::Float(99.99),
1841            ]))
1842            .unwrap();
1843
1844        table
1845            .add_row(DataRow::new(vec![
1846                DataValue::Integer(2),
1847                DataValue::String("Confirmed".to_string()),
1848                DataValue::Float(150.50),
1849            ]))
1850            .unwrap();
1851
1852        table
1853            .add_row(DataRow::new(vec![
1854                DataValue::Integer(3),
1855                DataValue::String("Pending".to_string()),
1856                DataValue::Float(75.00),
1857            ]))
1858            .unwrap();
1859
1860        let table = Arc::new(table);
1861        let engine = QueryEngine::new();
1862
1863        println!("\n=== Testing WHERE clause with Contains ===");
1864        println!("Table has {} rows", table.row_count());
1865        for i in 0..table.row_count() {
1866            let status = table.get_value(i, 1);
1867            println!("Row {i}: status = {status:?}");
1868        }
1869
1870        // Test 1: Basic string contains (should work)
1871        println!("\n--- Test 1: status.Contains('pend') ---");
1872        let result = engine.execute(
1873            table.clone(),
1874            "SELECT * FROM test WHERE status.Contains('pend')",
1875        );
1876        match result {
1877            Ok(view) => {
1878                println!("SUCCESS: Found {} matching rows", view.row_count());
1879                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1880            }
1881            Err(e) => {
1882                panic!("Query failed: {e}");
1883            }
1884        }
1885
1886        // Test 2: Numeric contains (should work with type coercion)
1887        println!("\n--- Test 2: price.Contains('9') ---");
1888        let result = engine.execute(
1889            table.clone(),
1890            "SELECT * FROM test WHERE price.Contains('9')",
1891        );
1892        match result {
1893            Ok(view) => {
1894                println!(
1895                    "SUCCESS: Found {} matching rows with price containing '9'",
1896                    view.row_count()
1897                );
1898                // Should find 99.99 row
1899                assert!(view.row_count() >= 1);
1900            }
1901            Err(e) => {
1902                panic!("Numeric coercion query failed: {e}");
1903            }
1904        }
1905
1906        println!("\n=== All tests passed! ===");
1907    }
1908
1909    #[test]
1910    fn test_not_in_clause() {
1911        // Initialize tracing for debug output
1912        let _ = tracing_subscriber::fmt()
1913            .with_max_level(tracing::Level::DEBUG)
1914            .try_init();
1915
1916        let mut table = DataTable::new("test");
1917        table.add_column(DataColumn::new("id"));
1918        table.add_column(DataColumn::new("country"));
1919
1920        // Add test data
1921        table
1922            .add_row(DataRow::new(vec![
1923                DataValue::Integer(1),
1924                DataValue::String("CA".to_string()),
1925            ]))
1926            .unwrap();
1927
1928        table
1929            .add_row(DataRow::new(vec![
1930                DataValue::Integer(2),
1931                DataValue::String("US".to_string()),
1932            ]))
1933            .unwrap();
1934
1935        table
1936            .add_row(DataRow::new(vec![
1937                DataValue::Integer(3),
1938                DataValue::String("UK".to_string()),
1939            ]))
1940            .unwrap();
1941
1942        let table = Arc::new(table);
1943        let engine = QueryEngine::new();
1944
1945        println!("\n=== Testing NOT IN clause ===");
1946        println!("Table has {} rows", table.row_count());
1947        for i in 0..table.row_count() {
1948            let country = table.get_value(i, 1);
1949            println!("Row {i}: country = {country:?}");
1950        }
1951
1952        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1953        println!("\n--- Test: country NOT IN ('CA') ---");
1954        let result = engine.execute(
1955            table.clone(),
1956            "SELECT * FROM test WHERE country NOT IN ('CA')",
1957        );
1958        match result {
1959            Ok(view) => {
1960                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1961                assert_eq!(view.row_count(), 2); // Should find US and UK
1962            }
1963            Err(e) => {
1964                panic!("NOT IN query failed: {e}");
1965            }
1966        }
1967
1968        println!("\n=== NOT IN test complete! ===");
1969    }
1970
1971    #[test]
1972    fn test_case_insensitive_in_and_not_in() {
1973        // Initialize tracing for debug output
1974        let _ = tracing_subscriber::fmt()
1975            .with_max_level(tracing::Level::DEBUG)
1976            .try_init();
1977
1978        let mut table = DataTable::new("test");
1979        table.add_column(DataColumn::new("id"));
1980        table.add_column(DataColumn::new("country"));
1981
1982        // Add test data with mixed case
1983        table
1984            .add_row(DataRow::new(vec![
1985                DataValue::Integer(1),
1986                DataValue::String("CA".to_string()), // uppercase
1987            ]))
1988            .unwrap();
1989
1990        table
1991            .add_row(DataRow::new(vec![
1992                DataValue::Integer(2),
1993                DataValue::String("us".to_string()), // lowercase
1994            ]))
1995            .unwrap();
1996
1997        table
1998            .add_row(DataRow::new(vec![
1999                DataValue::Integer(3),
2000                DataValue::String("UK".to_string()), // uppercase
2001            ]))
2002            .unwrap();
2003
2004        let table = Arc::new(table);
2005
2006        println!("\n=== Testing Case-Insensitive IN clause ===");
2007        println!("Table has {} rows", table.row_count());
2008        for i in 0..table.row_count() {
2009            let country = table.get_value(i, 1);
2010            println!("Row {i}: country = {country:?}");
2011        }
2012
2013        // Test case-insensitive IN - should match 'CA' with 'ca'
2014        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2015        let engine = QueryEngine::with_case_insensitive(true);
2016        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2017        match result {
2018            Ok(view) => {
2019                println!(
2020                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2021                    view.row_count()
2022                );
2023                assert_eq!(view.row_count(), 1); // Should find CA row
2024            }
2025            Err(e) => {
2026                panic!("Case-insensitive IN query failed: {e}");
2027            }
2028        }
2029
2030        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
2031        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2032        let result = engine.execute(
2033            table.clone(),
2034            "SELECT * FROM test WHERE country NOT IN ('ca')",
2035        );
2036        match result {
2037            Ok(view) => {
2038                println!(
2039                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2040                    view.row_count()
2041                );
2042                assert_eq!(view.row_count(), 2); // Should find us and UK rows
2043            }
2044            Err(e) => {
2045                panic!("Case-insensitive NOT IN query failed: {e}");
2046            }
2047        }
2048
2049        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
2050        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2051        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
2052        let result = engine_case_sensitive
2053            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2054        match result {
2055            Ok(view) => {
2056                println!(
2057                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2058                    view.row_count()
2059                );
2060                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
2061            }
2062            Err(e) => {
2063                panic!("Case-sensitive IN query failed: {e}");
2064            }
2065        }
2066
2067        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2068    }
2069
2070    #[test]
2071    #[ignore = "Parentheses in WHERE clause not yet implemented"]
2072    fn test_parentheses_in_where_clause() {
2073        // Initialize tracing for debug output
2074        let _ = tracing_subscriber::fmt()
2075            .with_max_level(tracing::Level::DEBUG)
2076            .try_init();
2077
2078        let mut table = DataTable::new("test");
2079        table.add_column(DataColumn::new("id"));
2080        table.add_column(DataColumn::new("status"));
2081        table.add_column(DataColumn::new("priority"));
2082
2083        // Add test data
2084        table
2085            .add_row(DataRow::new(vec![
2086                DataValue::Integer(1),
2087                DataValue::String("Pending".to_string()),
2088                DataValue::String("High".to_string()),
2089            ]))
2090            .unwrap();
2091
2092        table
2093            .add_row(DataRow::new(vec![
2094                DataValue::Integer(2),
2095                DataValue::String("Complete".to_string()),
2096                DataValue::String("High".to_string()),
2097            ]))
2098            .unwrap();
2099
2100        table
2101            .add_row(DataRow::new(vec![
2102                DataValue::Integer(3),
2103                DataValue::String("Pending".to_string()),
2104                DataValue::String("Low".to_string()),
2105            ]))
2106            .unwrap();
2107
2108        table
2109            .add_row(DataRow::new(vec![
2110                DataValue::Integer(4),
2111                DataValue::String("Complete".to_string()),
2112                DataValue::String("Low".to_string()),
2113            ]))
2114            .unwrap();
2115
2116        let table = Arc::new(table);
2117        let engine = QueryEngine::new();
2118
2119        println!("\n=== Testing Parentheses in WHERE clause ===");
2120        println!("Table has {} rows", table.row_count());
2121        for i in 0..table.row_count() {
2122            let status = table.get_value(i, 1);
2123            let priority = table.get_value(i, 2);
2124            println!("Row {i}: status = {status:?}, priority = {priority:?}");
2125        }
2126
2127        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
2128        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2129        let result = engine.execute(
2130            table.clone(),
2131            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2132        );
2133        match result {
2134            Ok(view) => {
2135                println!(
2136                    "SUCCESS: Found {} rows with parenthetical logic",
2137                    view.row_count()
2138                );
2139                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
2140            }
2141            Err(e) => {
2142                panic!("Parentheses query failed: {e}");
2143            }
2144        }
2145
2146        println!("\n=== Parentheses test complete! ===");
2147    }
2148
2149    #[test]
2150    #[ignore = "Numeric type coercion needs fixing"]
2151    fn test_numeric_type_coercion() {
2152        // Initialize tracing for debug output
2153        let _ = tracing_subscriber::fmt()
2154            .with_max_level(tracing::Level::DEBUG)
2155            .try_init();
2156
2157        let mut table = DataTable::new("test");
2158        table.add_column(DataColumn::new("id"));
2159        table.add_column(DataColumn::new("price"));
2160        table.add_column(DataColumn::new("quantity"));
2161
2162        // Add test data with different numeric types
2163        table
2164            .add_row(DataRow::new(vec![
2165                DataValue::Integer(1),
2166                DataValue::Float(99.50), // Contains '.'
2167                DataValue::Integer(100),
2168            ]))
2169            .unwrap();
2170
2171        table
2172            .add_row(DataRow::new(vec![
2173                DataValue::Integer(2),
2174                DataValue::Float(150.0), // Contains '.' and '0'
2175                DataValue::Integer(200),
2176            ]))
2177            .unwrap();
2178
2179        table
2180            .add_row(DataRow::new(vec![
2181                DataValue::Integer(3),
2182                DataValue::Integer(75), // No decimal point
2183                DataValue::Integer(50),
2184            ]))
2185            .unwrap();
2186
2187        let table = Arc::new(table);
2188        let engine = QueryEngine::new();
2189
2190        println!("\n=== Testing Numeric Type Coercion ===");
2191        println!("Table has {} rows", table.row_count());
2192        for i in 0..table.row_count() {
2193            let price = table.get_value(i, 1);
2194            let quantity = table.get_value(i, 2);
2195            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2196        }
2197
2198        // Test Contains on float values - should find rows with decimal points
2199        println!("\n--- Test: price.Contains('.') ---");
2200        let result = engine.execute(
2201            table.clone(),
2202            "SELECT * FROM test WHERE price.Contains('.')",
2203        );
2204        match result {
2205            Ok(view) => {
2206                println!(
2207                    "SUCCESS: Found {} rows with decimal points in price",
2208                    view.row_count()
2209                );
2210                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
2211            }
2212            Err(e) => {
2213                panic!("Numeric Contains query failed: {e}");
2214            }
2215        }
2216
2217        // Test Contains on integer values converted to string
2218        println!("\n--- Test: quantity.Contains('0') ---");
2219        let result = engine.execute(
2220            table.clone(),
2221            "SELECT * FROM test WHERE quantity.Contains('0')",
2222        );
2223        match result {
2224            Ok(view) => {
2225                println!(
2226                    "SUCCESS: Found {} rows with '0' in quantity",
2227                    view.row_count()
2228                );
2229                assert_eq!(view.row_count(), 2); // Should find 100 and 200
2230            }
2231            Err(e) => {
2232                panic!("Integer Contains query failed: {e}");
2233            }
2234        }
2235
2236        println!("\n=== Numeric type coercion test complete! ===");
2237    }
2238
2239    #[test]
2240    fn test_datetime_comparisons() {
2241        // Initialize tracing for debug output
2242        let _ = tracing_subscriber::fmt()
2243            .with_max_level(tracing::Level::DEBUG)
2244            .try_init();
2245
2246        let mut table = DataTable::new("test");
2247        table.add_column(DataColumn::new("id"));
2248        table.add_column(DataColumn::new("created_date"));
2249
2250        // Add test data with date strings (as they would come from CSV)
2251        table
2252            .add_row(DataRow::new(vec![
2253                DataValue::Integer(1),
2254                DataValue::String("2024-12-15".to_string()),
2255            ]))
2256            .unwrap();
2257
2258        table
2259            .add_row(DataRow::new(vec![
2260                DataValue::Integer(2),
2261                DataValue::String("2025-01-15".to_string()),
2262            ]))
2263            .unwrap();
2264
2265        table
2266            .add_row(DataRow::new(vec![
2267                DataValue::Integer(3),
2268                DataValue::String("2025-02-15".to_string()),
2269            ]))
2270            .unwrap();
2271
2272        let table = Arc::new(table);
2273        let engine = QueryEngine::new();
2274
2275        println!("\n=== Testing DateTime Comparisons ===");
2276        println!("Table has {} rows", table.row_count());
2277        for i in 0..table.row_count() {
2278            let date = table.get_value(i, 1);
2279            println!("Row {i}: created_date = {date:?}");
2280        }
2281
2282        // Test DateTime constructor comparison - should find dates after 2025-01-01
2283        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2284        let result = engine.execute(
2285            table.clone(),
2286            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2287        );
2288        match result {
2289            Ok(view) => {
2290                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2291                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2292            }
2293            Err(e) => {
2294                panic!("DateTime comparison query failed: {e}");
2295            }
2296        }
2297
2298        println!("\n=== DateTime comparison test complete! ===");
2299    }
2300
2301    #[test]
2302    fn test_not_with_method_calls() {
2303        // Initialize tracing for debug output
2304        let _ = tracing_subscriber::fmt()
2305            .with_max_level(tracing::Level::DEBUG)
2306            .try_init();
2307
2308        let mut table = DataTable::new("test");
2309        table.add_column(DataColumn::new("id"));
2310        table.add_column(DataColumn::new("status"));
2311
2312        // Add test data
2313        table
2314            .add_row(DataRow::new(vec![
2315                DataValue::Integer(1),
2316                DataValue::String("Pending Review".to_string()),
2317            ]))
2318            .unwrap();
2319
2320        table
2321            .add_row(DataRow::new(vec![
2322                DataValue::Integer(2),
2323                DataValue::String("Complete".to_string()),
2324            ]))
2325            .unwrap();
2326
2327        table
2328            .add_row(DataRow::new(vec![
2329                DataValue::Integer(3),
2330                DataValue::String("Pending Approval".to_string()),
2331            ]))
2332            .unwrap();
2333
2334        let table = Arc::new(table);
2335        let engine = QueryEngine::with_case_insensitive(true);
2336
2337        println!("\n=== Testing NOT with Method Calls ===");
2338        println!("Table has {} rows", table.row_count());
2339        for i in 0..table.row_count() {
2340            let status = table.get_value(i, 1);
2341            println!("Row {i}: status = {status:?}");
2342        }
2343
2344        // Test NOT with Contains - should exclude rows containing "pend"
2345        println!("\n--- Test: NOT status.Contains('pend') ---");
2346        let result = engine.execute(
2347            table.clone(),
2348            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2349        );
2350        match result {
2351            Ok(view) => {
2352                println!(
2353                    "SUCCESS: Found {} rows NOT containing 'pend'",
2354                    view.row_count()
2355                );
2356                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2357            }
2358            Err(e) => {
2359                panic!("NOT Contains query failed: {e}");
2360            }
2361        }
2362
2363        // Test NOT with StartsWith
2364        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2365        let result = engine.execute(
2366            table.clone(),
2367            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2368        );
2369        match result {
2370            Ok(view) => {
2371                println!(
2372                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2373                    view.row_count()
2374                );
2375                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2376            }
2377            Err(e) => {
2378                panic!("NOT StartsWith query failed: {e}");
2379            }
2380        }
2381
2382        println!("\n=== NOT with method calls test complete! ===");
2383    }
2384
2385    #[test]
2386    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2387    fn test_complex_logical_expressions() {
2388        // Initialize tracing for debug output
2389        let _ = tracing_subscriber::fmt()
2390            .with_max_level(tracing::Level::DEBUG)
2391            .try_init();
2392
2393        let mut table = DataTable::new("test");
2394        table.add_column(DataColumn::new("id"));
2395        table.add_column(DataColumn::new("status"));
2396        table.add_column(DataColumn::new("priority"));
2397        table.add_column(DataColumn::new("assigned"));
2398
2399        // Add comprehensive test data
2400        table
2401            .add_row(DataRow::new(vec![
2402                DataValue::Integer(1),
2403                DataValue::String("Pending".to_string()),
2404                DataValue::String("High".to_string()),
2405                DataValue::String("John".to_string()),
2406            ]))
2407            .unwrap();
2408
2409        table
2410            .add_row(DataRow::new(vec![
2411                DataValue::Integer(2),
2412                DataValue::String("Complete".to_string()),
2413                DataValue::String("High".to_string()),
2414                DataValue::String("Jane".to_string()),
2415            ]))
2416            .unwrap();
2417
2418        table
2419            .add_row(DataRow::new(vec![
2420                DataValue::Integer(3),
2421                DataValue::String("Pending".to_string()),
2422                DataValue::String("Low".to_string()),
2423                DataValue::String("John".to_string()),
2424            ]))
2425            .unwrap();
2426
2427        table
2428            .add_row(DataRow::new(vec![
2429                DataValue::Integer(4),
2430                DataValue::String("In Progress".to_string()),
2431                DataValue::String("Medium".to_string()),
2432                DataValue::String("Jane".to_string()),
2433            ]))
2434            .unwrap();
2435
2436        let table = Arc::new(table);
2437        let engine = QueryEngine::new();
2438
2439        println!("\n=== Testing Complex Logical Expressions ===");
2440        println!("Table has {} rows", table.row_count());
2441        for i in 0..table.row_count() {
2442            let status = table.get_value(i, 1);
2443            let priority = table.get_value(i, 2);
2444            let assigned = table.get_value(i, 3);
2445            println!(
2446                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2447            );
2448        }
2449
2450        // Test complex AND/OR logic
2451        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2452        let result = engine.execute(
2453            table.clone(),
2454            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2455        );
2456        match result {
2457            Ok(view) => {
2458                println!(
2459                    "SUCCESS: Found {} rows with complex logic",
2460                    view.row_count()
2461                );
2462                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2463            }
2464            Err(e) => {
2465                panic!("Complex logic query failed: {e}");
2466            }
2467        }
2468
2469        // Test NOT with complex expressions
2470        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2471        let result = engine.execute(
2472            table.clone(),
2473            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2474        );
2475        match result {
2476            Ok(view) => {
2477                println!(
2478                    "SUCCESS: Found {} rows with NOT complex logic",
2479                    view.row_count()
2480                );
2481                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2482            }
2483            Err(e) => {
2484                panic!("NOT complex logic query failed: {e}");
2485            }
2486        }
2487
2488        println!("\n=== Complex logical expressions test complete! ===");
2489    }
2490
2491    #[test]
2492    fn test_mixed_data_types_and_edge_cases() {
2493        // Initialize tracing for debug output
2494        let _ = tracing_subscriber::fmt()
2495            .with_max_level(tracing::Level::DEBUG)
2496            .try_init();
2497
2498        let mut table = DataTable::new("test");
2499        table.add_column(DataColumn::new("id"));
2500        table.add_column(DataColumn::new("value"));
2501        table.add_column(DataColumn::new("nullable_field"));
2502
2503        // Add test data with mixed types and edge cases
2504        table
2505            .add_row(DataRow::new(vec![
2506                DataValue::Integer(1),
2507                DataValue::String("123.45".to_string()),
2508                DataValue::String("present".to_string()),
2509            ]))
2510            .unwrap();
2511
2512        table
2513            .add_row(DataRow::new(vec![
2514                DataValue::Integer(2),
2515                DataValue::Float(678.90),
2516                DataValue::Null,
2517            ]))
2518            .unwrap();
2519
2520        table
2521            .add_row(DataRow::new(vec![
2522                DataValue::Integer(3),
2523                DataValue::Boolean(true),
2524                DataValue::String("also present".to_string()),
2525            ]))
2526            .unwrap();
2527
2528        table
2529            .add_row(DataRow::new(vec![
2530                DataValue::Integer(4),
2531                DataValue::String("false".to_string()),
2532                DataValue::Null,
2533            ]))
2534            .unwrap();
2535
2536        let table = Arc::new(table);
2537        let engine = QueryEngine::new();
2538
2539        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2540        println!("Table has {} rows", table.row_count());
2541        for i in 0..table.row_count() {
2542            let value = table.get_value(i, 1);
2543            let nullable = table.get_value(i, 2);
2544            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2545        }
2546
2547        // Test type coercion with boolean Contains
2548        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2549        let result = engine.execute(
2550            table.clone(),
2551            "SELECT * FROM test WHERE value.Contains('true')",
2552        );
2553        match result {
2554            Ok(view) => {
2555                println!(
2556                    "SUCCESS: Found {} rows with boolean coercion",
2557                    view.row_count()
2558                );
2559                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2560            }
2561            Err(e) => {
2562                panic!("Boolean coercion query failed: {e}");
2563            }
2564        }
2565
2566        // Test multiple IN values with mixed types
2567        println!("\n--- Test: id IN (1, 3) ---");
2568        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2569        match result {
2570            Ok(view) => {
2571                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2572                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2573            }
2574            Err(e) => {
2575                panic!("Multiple IN values query failed: {e}");
2576            }
2577        }
2578
2579        println!("\n=== Mixed data types test complete! ===");
2580    }
2581
2582    /// Test that aggregate-only queries return exactly one row (regression test)
2583    #[test]
2584    fn test_aggregate_only_single_row() {
2585        let table = create_test_stock_data();
2586        let engine = QueryEngine::new();
2587
2588        // Test query with multiple aggregates - should return exactly 1 row
2589        let result = engine
2590            .execute(
2591                table.clone(),
2592                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2593            )
2594            .expect("Query should succeed");
2595
2596        assert_eq!(
2597            result.row_count(),
2598            1,
2599            "Aggregate-only query should return exactly 1 row"
2600        );
2601        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2602
2603        // Verify the actual values are correct
2604        let source = result.source();
2605        let row = source.get_row(0).expect("Should have first row");
2606
2607        // COUNT(*) should be 5 (total rows)
2608        assert_eq!(row.values[0], DataValue::Integer(5));
2609
2610        // MIN should be 99.5
2611        assert_eq!(row.values[1], DataValue::Float(99.5));
2612
2613        // MAX should be 105.0
2614        assert_eq!(row.values[2], DataValue::Float(105.0));
2615
2616        // AVG should be approximately 102.4
2617        if let DataValue::Float(avg) = &row.values[3] {
2618            assert!(
2619                (avg - 102.4).abs() < 0.01,
2620                "Average should be approximately 102.4, got {}",
2621                avg
2622            );
2623        } else {
2624            panic!("AVG should return a Float value");
2625        }
2626    }
2627
2628    /// Test single aggregate function returns single row
2629    #[test]
2630    fn test_single_aggregate_single_row() {
2631        let table = create_test_stock_data();
2632        let engine = QueryEngine::new();
2633
2634        let result = engine
2635            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2636            .expect("Query should succeed");
2637
2638        assert_eq!(
2639            result.row_count(),
2640            1,
2641            "Single aggregate query should return exactly 1 row"
2642        );
2643        assert_eq!(result.column_count(), 1, "Should have 1 column");
2644
2645        let source = result.source();
2646        let row = source.get_row(0).expect("Should have first row");
2647        assert_eq!(row.values[0], DataValue::Integer(5));
2648    }
2649
2650    /// Test aggregate with WHERE clause filtering
2651    #[test]
2652    fn test_aggregate_with_where_single_row() {
2653        let table = create_test_stock_data();
2654        let engine = QueryEngine::new();
2655
2656        // Filter to only high-value stocks (>= 103.0) and aggregate
2657        let result = engine
2658            .execute(
2659                table.clone(),
2660                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2661            )
2662            .expect("Query should succeed");
2663
2664        assert_eq!(
2665            result.row_count(),
2666            1,
2667            "Filtered aggregate query should return exactly 1 row"
2668        );
2669        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2670
2671        let source = result.source();
2672        let row = source.get_row(0).expect("Should have first row");
2673
2674        // Should find 2 rows (103.5 and 105.0)
2675        assert_eq!(row.values[0], DataValue::Integer(2));
2676        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2677        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2678    }
2679
2680    #[test]
2681    fn test_not_in_parsing() {
2682        use crate::sql::recursive_parser::Parser;
2683
2684        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2685        println!("\n=== Testing NOT IN parsing ===");
2686        println!("Parsing query: {query}");
2687
2688        let mut parser = Parser::new(query);
2689        match parser.parse() {
2690            Ok(statement) => {
2691                println!("Parsed statement: {statement:#?}");
2692                if let Some(where_clause) = statement.where_clause {
2693                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2694                    if let Some(first_condition) = where_clause.conditions.first() {
2695                        println!("First condition expression: {:#?}", first_condition.expr);
2696                    }
2697                }
2698            }
2699            Err(e) => {
2700                panic!("Parse error: {e}");
2701            }
2702        }
2703    }
2704
2705    /// Create test stock data for aggregate testing
2706    fn create_test_stock_data() -> Arc<DataTable> {
2707        let mut table = DataTable::new("stock");
2708
2709        table.add_column(DataColumn::new("symbol"));
2710        table.add_column(DataColumn::new("close"));
2711        table.add_column(DataColumn::new("volume"));
2712
2713        // Add 5 rows of test data
2714        let test_data = vec![
2715            ("AAPL", 99.5, 1000),
2716            ("AAPL", 101.2, 1500),
2717            ("AAPL", 103.5, 2000),
2718            ("AAPL", 105.0, 1200),
2719            ("AAPL", 102.8, 1800),
2720        ];
2721
2722        for (symbol, close, volume) in test_data {
2723            table
2724                .add_row(DataRow::new(vec![
2725                    DataValue::String(symbol.to_string()),
2726                    DataValue::Float(close),
2727                    DataValue::Integer(volume),
2728                ]))
2729                .expect("Should add row successfully");
2730        }
2731
2732        Arc::new(table)
2733    }
2734}
2735
2736#[cfg(test)]
2737#[path = "query_engine_tests.rs"]
2738mod query_engine_tests;