sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::recursive_parser::{
27    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
28    TableFunction,
29};
30
31/// Execution context for tracking table aliases and scope during query execution
32#[derive(Debug, Clone)]
33pub struct ExecutionContext {
34    /// Map from alias to actual table/CTE name
35    /// Example: "t" -> "#tmp_trades", "a" -> "data"
36    alias_map: HashMap<String, String>,
37}
38
39impl ExecutionContext {
40    /// Create a new empty execution context
41    pub fn new() -> Self {
42        Self {
43            alias_map: HashMap::new(),
44        }
45    }
46
47    /// Register a table alias
48    pub fn register_alias(&mut self, alias: String, table_name: String) {
49        debug!("Registering alias: {} -> {}", alias, table_name);
50        self.alias_map.insert(alias, table_name);
51    }
52
53    /// Resolve an alias to its actual table name
54    /// Returns the alias itself if not found in the map
55    pub fn resolve_alias(&self, name: &str) -> String {
56        self.alias_map
57            .get(name)
58            .cloned()
59            .unwrap_or_else(|| name.to_string())
60    }
61
62    /// Check if a name is a registered alias
63    pub fn is_alias(&self, name: &str) -> bool {
64        self.alias_map.contains_key(name)
65    }
66
67    /// Get a copy of all registered aliases
68    pub fn get_aliases(&self) -> HashMap<String, String> {
69        self.alias_map.clone()
70    }
71
72    /// Resolve a column reference to its index in the table, handling aliases
73    ///
74    /// This is the unified column resolution function that should be used by all
75    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
76    /// alias resolution behavior.
77    ///
78    /// Resolution strategy:
79    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
80    ///    a. Resolve the alias: t -> actual_table_name
81    ///    b. Try qualified lookup: "actual_table_name.amount"
82    ///    c. Fall back to unqualified: "amount"
83    /// 2. If column_ref has no prefix:
84    ///    a. Try simple column name lookup: "amount"
85    ///    b. Try as qualified name if it contains a dot: "table.column"
86    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
87        if let Some(table_prefix) = &column_ref.table_prefix {
88            // Qualified column reference: resolve the alias first
89            let actual_table = self.resolve_alias(table_prefix);
90
91            // Try qualified lookup: "actual_table.column"
92            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
93            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
94                debug!(
95                    "Resolved {}.{} -> qualified column '{}' at index {}",
96                    table_prefix, column_ref.name, qualified_name, idx
97                );
98                return Ok(idx);
99            }
100
101            // Fall back to unqualified lookup
102            if let Some(idx) = table.get_column_index(&column_ref.name) {
103                debug!(
104                    "Resolved {}.{} -> unqualified column '{}' at index {}",
105                    table_prefix, column_ref.name, column_ref.name, idx
106                );
107                return Ok(idx);
108            }
109
110            // Not found with either qualified or unqualified name
111            Err(anyhow!(
112                "Column '{}' not found. Table '{}' may not support qualified column names",
113                qualified_name,
114                actual_table
115            ))
116        } else {
117            // Unqualified column reference
118            if let Some(idx) = table.get_column_index(&column_ref.name) {
119                debug!(
120                    "Resolved unqualified column '{}' at index {}",
121                    column_ref.name, idx
122                );
123                return Ok(idx);
124            }
125
126            // If the column name contains a dot, try it as a qualified name
127            if column_ref.name.contains('.') {
128                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
129                    debug!(
130                        "Resolved '{}' as qualified column at index {}",
131                        column_ref.name, idx
132                    );
133                    return Ok(idx);
134                }
135            }
136
137            // Column not found - provide helpful error
138            let suggestion = self.find_similar_column(table, &column_ref.name);
139            match suggestion {
140                Some(similar) => Err(anyhow!(
141                    "Column '{}' not found. Did you mean '{}'?",
142                    column_ref.name,
143                    similar
144                )),
145                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
146            }
147        }
148    }
149
150    /// Find a similar column name using edit distance (for better error messages)
151    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
152        let columns = table.column_names();
153        let mut best_match: Option<(String, usize)> = None;
154
155        for col in columns {
156            let distance = edit_distance(name, &col);
157            if distance <= 2 {
158                // Allow up to 2 character differences
159                match best_match {
160                    Some((_, best_dist)) if distance < best_dist => {
161                        best_match = Some((col.clone(), distance));
162                    }
163                    None => {
164                        best_match = Some((col.clone(), distance));
165                    }
166                    _ => {}
167                }
168            }
169        }
170
171        best_match.map(|(name, _)| name)
172    }
173}
174
175impl Default for ExecutionContext {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181/// Calculate edit distance between two strings (Levenshtein distance)
182fn edit_distance(a: &str, b: &str) -> usize {
183    let len_a = a.chars().count();
184    let len_b = b.chars().count();
185
186    if len_a == 0 {
187        return len_b;
188    }
189    if len_b == 0 {
190        return len_a;
191    }
192
193    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
194
195    for i in 0..=len_a {
196        matrix[i][0] = i;
197    }
198    for j in 0..=len_b {
199        matrix[0][j] = j;
200    }
201
202    let a_chars: Vec<char> = a.chars().collect();
203    let b_chars: Vec<char> = b.chars().collect();
204
205    for i in 1..=len_a {
206        for j in 1..=len_b {
207            let cost = if a_chars[i - 1] == b_chars[j - 1] {
208                0
209            } else {
210                1
211            };
212            matrix[i][j] = min(
213                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
214                matrix[i - 1][j - 1] + cost,
215            );
216        }
217    }
218
219    matrix[len_a][len_b]
220}
221
222/// Query engine that executes SQL directly on `DataTable`
223#[derive(Clone)]
224pub struct QueryEngine {
225    case_insensitive: bool,
226    date_notation: String,
227    _behavior_config: Option<BehaviorConfig>,
228}
229
230impl Default for QueryEngine {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236impl QueryEngine {
237    #[must_use]
238    pub fn new() -> Self {
239        Self {
240            case_insensitive: false,
241            date_notation: get_date_notation(),
242            _behavior_config: None,
243        }
244    }
245
246    #[must_use]
247    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
248        let case_insensitive = config.case_insensitive_default;
249        // Use get_date_notation() to respect environment variable override
250        let date_notation = get_date_notation();
251        Self {
252            case_insensitive,
253            date_notation,
254            _behavior_config: Some(config),
255        }
256    }
257
258    #[must_use]
259    pub fn with_date_notation(_date_notation: String) -> Self {
260        Self {
261            case_insensitive: false,
262            date_notation: get_date_notation(), // Always use the global function
263            _behavior_config: None,
264        }
265    }
266
267    #[must_use]
268    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
269        Self {
270            case_insensitive,
271            date_notation: get_date_notation(),
272            _behavior_config: None,
273        }
274    }
275
276    #[must_use]
277    pub fn with_case_insensitive_and_date_notation(
278        case_insensitive: bool,
279        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
280    ) -> Self {
281        Self {
282            case_insensitive,
283            date_notation: get_date_notation(), // Always use the global function
284            _behavior_config: None,
285        }
286    }
287
288    /// Find a column name similar to the given name using edit distance
289    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
290        let columns = table.column_names();
291        let mut best_match: Option<(String, usize)> = None;
292
293        for col in columns {
294            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
295            // Only suggest if distance is small (likely a typo)
296            // Allow up to 3 edits for longer names
297            let max_distance = if name.len() > 10 { 3 } else { 2 };
298            if distance <= max_distance {
299                match &best_match {
300                    None => best_match = Some((col, distance)),
301                    Some((_, best_dist)) if distance < *best_dist => {
302                        best_match = Some((col, distance));
303                    }
304                    _ => {}
305                }
306            }
307        }
308
309        best_match.map(|(name, _)| name)
310    }
311
312    /// Calculate Levenshtein edit distance between two strings
313    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
314        let len1 = s1.len();
315        let len2 = s2.len();
316        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
317
318        for i in 0..=len1 {
319            matrix[i][0] = i;
320        }
321        for j in 0..=len2 {
322            matrix[0][j] = j;
323        }
324
325        for (i, c1) in s1.chars().enumerate() {
326            for (j, c2) in s2.chars().enumerate() {
327                let cost = usize::from(c1 != c2);
328                matrix[i + 1][j + 1] = std::cmp::min(
329                    matrix[i][j + 1] + 1, // deletion
330                    std::cmp::min(
331                        matrix[i + 1][j] + 1, // insertion
332                        matrix[i][j] + cost,  // substitution
333                    ),
334                );
335            }
336        }
337
338        matrix[len1][len2]
339    }
340
341    /// Check if an expression contains UNNEST function call
342    fn contains_unnest(expr: &SqlExpression) -> bool {
343        match expr {
344            // Direct UNNEST variant
345            SqlExpression::Unnest { .. } => true,
346            SqlExpression::FunctionCall { name, args, .. } => {
347                if name.to_uppercase() == "UNNEST" {
348                    return true;
349                }
350                // Check recursively in function arguments
351                args.iter().any(Self::contains_unnest)
352            }
353            SqlExpression::BinaryOp { left, right, .. } => {
354                Self::contains_unnest(left) || Self::contains_unnest(right)
355            }
356            SqlExpression::Not { expr } => Self::contains_unnest(expr),
357            SqlExpression::CaseExpression {
358                when_branches,
359                else_branch,
360            } => {
361                when_branches.iter().any(|branch| {
362                    Self::contains_unnest(&branch.condition)
363                        || Self::contains_unnest(&branch.result)
364                }) || else_branch
365                    .as_ref()
366                    .map_or(false, |e| Self::contains_unnest(e))
367            }
368            SqlExpression::SimpleCaseExpression {
369                expr,
370                when_branches,
371                else_branch,
372            } => {
373                Self::contains_unnest(expr)
374                    || when_branches.iter().any(|branch| {
375                        Self::contains_unnest(&branch.value)
376                            || Self::contains_unnest(&branch.result)
377                    })
378                    || else_branch
379                        .as_ref()
380                        .map_or(false, |e| Self::contains_unnest(e))
381            }
382            SqlExpression::InList { expr, values } => {
383                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
384            }
385            SqlExpression::NotInList { expr, values } => {
386                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
387            }
388            SqlExpression::Between { expr, lower, upper } => {
389                Self::contains_unnest(expr)
390                    || Self::contains_unnest(lower)
391                    || Self::contains_unnest(upper)
392            }
393            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
394            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
395            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
396            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
397            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
398            SqlExpression::ChainedMethodCall { base, args, .. } => {
399                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
400            }
401            _ => false,
402        }
403    }
404
405    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
406    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407        let (view, _plan) = self.execute_with_plan(table, sql)?;
408        Ok(view)
409    }
410
411    /// Execute a SQL query with optional temp table registry access
412    pub fn execute_with_temp_tables(
413        &self,
414        table: Arc<DataTable>,
415        sql: &str,
416        temp_tables: Option<&TempTableRegistry>,
417    ) -> Result<DataView> {
418        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419        Ok(view)
420    }
421
422    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
423    pub fn execute_statement(
424        &self,
425        table: Arc<DataTable>,
426        statement: SelectStatement,
427    ) -> Result<DataView> {
428        self.execute_statement_with_temp_tables(table, statement, None)
429    }
430
431    /// Execute a parsed SelectStatement with optional temp table access
432    pub fn execute_statement_with_temp_tables(
433        &self,
434        table: Arc<DataTable>,
435        statement: SelectStatement,
436        temp_tables: Option<&TempTableRegistry>,
437    ) -> Result<DataView> {
438        // First process CTEs to build context
439        let mut cte_context = HashMap::new();
440
441        // Add temp tables to CTE context if provided
442        if let Some(temp_registry) = temp_tables {
443            for table_name in temp_registry.list_tables() {
444                if let Some(temp_table) = temp_registry.get(&table_name) {
445                    debug!("Adding temp table {} to CTE context", table_name);
446                    let view = DataView::new(temp_table);
447                    cte_context.insert(table_name, Arc::new(view));
448                }
449            }
450        }
451
452        for cte in &statement.ctes {
453            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454            // Execute the CTE based on its type
455            let cte_result = match &cte.cte_type {
456                CTEType::Standard(query) => {
457                    // Execute the CTE query (it might reference earlier CTEs)
458                    let view = self.build_view_with_context(
459                        table.clone(),
460                        query.clone(),
461                        &mut cte_context,
462                    )?;
463
464                    // Materialize the view and enrich columns with qualified names
465                    let mut materialized = self.materialize_view(view)?;
466
467                    // Enrich columns with qualified names for proper scoping
468                    for column in materialized.columns_mut() {
469                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470                        column.source_table = Some(cte.name.clone());
471                    }
472
473                    DataView::new(Arc::new(materialized))
474                }
475                CTEType::Web(web_spec) => {
476                    // Fetch data from URL
477                    use crate::web::http_fetcher::WebDataFetcher;
478
479                    let fetcher = WebDataFetcher::new()?;
480                    // Pass None for query context (no full SQL available in these contexts)
481                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483                    // Enrich columns with qualified names for proper scoping
484                    for column in data_table.columns_mut() {
485                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486                        column.source_table = Some(cte.name.clone());
487                    }
488
489                    // Convert DataTable to DataView
490                    DataView::new(Arc::new(data_table))
491                }
492            };
493            // Store the result in the context for later use
494            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495            debug!(
496                "QueryEngine: CTE '{}' pre-processed, stored in context",
497                cte.name
498            );
499        }
500
501        // Now process subqueries with CTE context available
502        let mut subquery_executor =
503            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506        // Build the view with the same CTE context
507        self.build_view_with_context(table, processed_statement, &mut cte_context)
508    }
509
510    /// Execute a statement with provided CTE context (for subqueries)
511    pub fn execute_statement_with_cte_context(
512        &self,
513        table: Arc<DataTable>,
514        statement: SelectStatement,
515        cte_context: &HashMap<String, Arc<DataView>>,
516    ) -> Result<DataView> {
517        // Clone the context so we can add any CTEs from this statement
518        let mut local_context = cte_context.clone();
519
520        // Process any CTEs in this statement (they might be nested)
521        for cte in &statement.ctes {
522            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523            let cte_result = match &cte.cte_type {
524                CTEType::Standard(query) => {
525                    let view = self.build_view_with_context(
526                        table.clone(),
527                        query.clone(),
528                        &mut local_context,
529                    )?;
530
531                    // Materialize the view and enrich columns with qualified names
532                    let mut materialized = self.materialize_view(view)?;
533
534                    // Enrich columns with qualified names for proper scoping
535                    for column in materialized.columns_mut() {
536                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537                        column.source_table = Some(cte.name.clone());
538                    }
539
540                    DataView::new(Arc::new(materialized))
541                }
542                CTEType::Web(web_spec) => {
543                    // Fetch data from URL
544                    use crate::web::http_fetcher::WebDataFetcher;
545
546                    let fetcher = WebDataFetcher::new()?;
547                    // Pass None for query context (no full SQL available in these contexts)
548                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550                    // Enrich columns with qualified names for proper scoping
551                    for column in data_table.columns_mut() {
552                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553                        column.source_table = Some(cte.name.clone());
554                    }
555
556                    // Convert DataTable to DataView
557                    DataView::new(Arc::new(data_table))
558                }
559            };
560            local_context.insert(cte.name.clone(), Arc::new(cte_result));
561        }
562
563        // Process subqueries with the complete context
564        let mut subquery_executor =
565            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568        // Build the view
569        self.build_view_with_context(table, processed_statement, &mut local_context)
570    }
571
572    /// Execute a query and return both the result and the execution plan
573    pub fn execute_with_plan(
574        &self,
575        table: Arc<DataTable>,
576        sql: &str,
577    ) -> Result<(DataView, ExecutionPlan)> {
578        self.execute_with_plan_and_temp_tables(table, sql, None)
579    }
580
581    /// Execute a query with temp tables and return both the result and the execution plan
582    pub fn execute_with_plan_and_temp_tables(
583        &self,
584        table: Arc<DataTable>,
585        sql: &str,
586        temp_tables: Option<&TempTableRegistry>,
587    ) -> Result<(DataView, ExecutionPlan)> {
588        let mut plan_builder = ExecutionPlanBuilder::new();
589        let start_time = Instant::now();
590
591        // Parse the SQL query
592        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593        plan_builder.add_detail(format!("Query: {}", sql));
594        let mut parser = Parser::new(sql);
595        let statement = parser
596            .parse()
597            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598        plan_builder.add_detail(format!("Parsed successfully"));
599        if let Some(from) = &statement.from_table {
600            plan_builder.add_detail(format!("FROM: {}", from));
601        }
602        if statement.where_clause.is_some() {
603            plan_builder.add_detail("WHERE clause present".to_string());
604        }
605        plan_builder.end_step();
606
607        // First process CTEs to build context
608        let mut cte_context = HashMap::new();
609
610        // Add temp tables to CTE context if provided
611        if let Some(temp_registry) = temp_tables {
612            for table_name in temp_registry.list_tables() {
613                if let Some(temp_table) = temp_registry.get(&table_name) {
614                    debug!("Adding temp table {} to CTE context", table_name);
615                    let view = DataView::new(temp_table);
616                    cte_context.insert(table_name, Arc::new(view));
617                }
618            }
619        }
620
621        if !statement.ctes.is_empty() {
622            plan_builder.begin_step(
623                StepType::CTE,
624                format!("Process {} CTEs", statement.ctes.len()),
625            );
626
627            for cte in &statement.ctes {
628                let cte_start = Instant::now();
629                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631                let cte_result = match &cte.cte_type {
632                    CTEType::Standard(query) => {
633                        // Add CTE query details
634                        if let Some(from) = &query.from_table {
635                            plan_builder.add_detail(format!("Source: {}", from));
636                        }
637                        if query.where_clause.is_some() {
638                            plan_builder.add_detail("Has WHERE clause".to_string());
639                        }
640                        if query.group_by.is_some() {
641                            plan_builder.add_detail("Has GROUP BY".to_string());
642                        }
643
644                        debug!(
645                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646                            cte.name,
647                            cte_context.keys().collect::<Vec<_>>()
648                        );
649
650                        // Process subqueries in the CTE's query FIRST
651                        // This allows the subqueries to see all previously defined CTEs
652                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
653                            self.clone(),
654                            table.clone(),
655                            cte_context.clone(),
656                        );
657                        let processed_query = subquery_executor.execute_subqueries(query)?;
658
659                        let view = self.build_view_with_context(
660                            table.clone(),
661                            processed_query,
662                            &mut cte_context,
663                        )?;
664
665                        // Materialize the view and enrich columns with qualified names
666                        let mut materialized = self.materialize_view(view)?;
667
668                        // Enrich columns with qualified names for proper scoping
669                        for column in materialized.columns_mut() {
670                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671                            column.source_table = Some(cte.name.clone());
672                        }
673
674                        DataView::new(Arc::new(materialized))
675                    }
676                    CTEType::Web(web_spec) => {
677                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
678                        if let Some(format) = &web_spec.format {
679                            plan_builder.add_detail(format!("Format: {:?}", format));
680                        }
681                        if let Some(cache) = web_spec.cache_seconds {
682                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
683                        }
684
685                        // Fetch data from URL
686                        use crate::web::http_fetcher::WebDataFetcher;
687
688                        let fetcher = WebDataFetcher::new()?;
689                        // Pass None for query context - each WEB CTE is independent
690                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692                        // Enrich columns with qualified names for proper scoping
693                        for column in data_table.columns_mut() {
694                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695                            column.source_table = Some(cte.name.clone());
696                        }
697
698                        // Convert DataTable to DataView
699                        DataView::new(Arc::new(data_table))
700                    }
701                };
702
703                // Record CTE statistics
704                plan_builder.set_rows_out(cte_result.row_count());
705                plan_builder.add_detail(format!(
706                    "Result: {} rows, {} columns",
707                    cte_result.row_count(),
708                    cte_result.column_count()
709                ));
710                plan_builder.add_detail(format!(
711                    "Execution time: {:.3}ms",
712                    cte_start.elapsed().as_secs_f64() * 1000.0
713                ));
714
715                debug!(
716                    "QueryEngine: Storing CTE '{}' in context with {} rows",
717                    cte.name,
718                    cte_result.row_count()
719                );
720                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721                plan_builder.end_step();
722            }
723
724            plan_builder.add_detail(format!(
725                "All {} CTEs cached in context",
726                statement.ctes.len()
727            ));
728            plan_builder.end_step();
729        }
730
731        // Process subqueries in the statement with CTE context
732        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733        let mut subquery_executor =
734            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736        // Check if there are subqueries to process
737        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738            // This is a simplified check - in reality we'd need to walk the AST
739            format!("{:?}", w).contains("Subquery")
740        });
741
742        if has_subqueries {
743            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744        }
745
746        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748        if has_subqueries {
749            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750        } else {
751            plan_builder.add_detail("No subqueries to process".to_string());
752        }
753
754        plan_builder.end_step();
755        let result = self.build_view_with_context_and_plan(
756            table,
757            processed_statement,
758            &mut cte_context,
759            &mut plan_builder,
760        )?;
761
762        let total_duration = start_time.elapsed();
763        info!(
764            "Query execution complete: total={:?}, rows={}",
765            total_duration,
766            result.row_count()
767        );
768
769        let plan = plan_builder.build();
770        Ok((result, plan))
771    }
772
773    /// Build a `DataView` from a parsed SQL statement
774    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775        let mut cte_context = HashMap::new();
776        self.build_view_with_context(table, statement, &mut cte_context)
777    }
778
779    /// Build a DataView from a SelectStatement with CTE context
780    fn build_view_with_context(
781        &self,
782        table: Arc<DataTable>,
783        statement: SelectStatement,
784        cte_context: &mut HashMap<String, Arc<DataView>>,
785    ) -> Result<DataView> {
786        let mut dummy_plan = ExecutionPlanBuilder::new();
787        let mut exec_context = ExecutionContext::new();
788        self.build_view_with_context_and_plan_and_exec(
789            table,
790            statement,
791            cte_context,
792            &mut dummy_plan,
793            &mut exec_context,
794        )
795    }
796
797    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
798    fn build_view_with_context_and_plan(
799        &self,
800        table: Arc<DataTable>,
801        statement: SelectStatement,
802        cte_context: &mut HashMap<String, Arc<DataView>>,
803        plan: &mut ExecutionPlanBuilder,
804    ) -> Result<DataView> {
805        let mut exec_context = ExecutionContext::new();
806        self.build_view_with_context_and_plan_and_exec(
807            table,
808            statement,
809            cte_context,
810            plan,
811            &mut exec_context,
812        )
813    }
814
815    /// Build a DataView with CTE context, execution plan, and alias resolution context
816    fn build_view_with_context_and_plan_and_exec(
817        &self,
818        table: Arc<DataTable>,
819        statement: SelectStatement,
820        cte_context: &mut HashMap<String, Arc<DataView>>,
821        plan: &mut ExecutionPlanBuilder,
822        exec_context: &mut ExecutionContext,
823    ) -> Result<DataView> {
824        // First, process any CTEs that aren't already in the context
825        for cte in &statement.ctes {
826            // Skip if already processed (e.g., by execute_select for WEB CTEs)
827            if cte_context.contains_key(&cte.name) {
828                debug!(
829                    "QueryEngine: CTE '{}' already in context, skipping",
830                    cte.name
831                );
832                continue;
833            }
834
835            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836            debug!(
837                "QueryEngine: Available CTEs for '{}': {:?}",
838                cte.name,
839                cte_context.keys().collect::<Vec<_>>()
840            );
841
842            // Execute the CTE query (it might reference earlier CTEs)
843            let cte_result = match &cte.cte_type {
844                CTEType::Standard(query) => {
845                    let view =
846                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848                    // Materialize the view and enrich columns with qualified names
849                    let mut materialized = self.materialize_view(view)?;
850
851                    // Enrich columns with qualified names for proper scoping
852                    for column in materialized.columns_mut() {
853                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854                        column.source_table = Some(cte.name.clone());
855                    }
856
857                    DataView::new(Arc::new(materialized))
858                }
859                CTEType::Web(_web_spec) => {
860                    // Web CTEs should have been processed earlier in execute_select
861                    return Err(anyhow!(
862                        "Web CTEs should be processed in execute_select method"
863                    ));
864                }
865            };
866
867            // Store the result in the context for later use
868            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869            debug!(
870                "QueryEngine: CTE '{}' processed, stored in context",
871                cte.name
872            );
873        }
874
875        // Determine the source table for the main query
876        let source_table = if let Some(ref table_func) = statement.from_function {
877            // Handle table functions like RANGE()
878            debug!("QueryEngine: Processing table function...");
879            match table_func {
880                TableFunction::Generator { name, args } => {
881                    // Use the generator registry to create the table
882                    use crate::sql::generators::GeneratorRegistry;
883
884                    // Create generator registry (could be cached in QueryEngine)
885                    let registry = GeneratorRegistry::new();
886
887                    if let Some(generator) = registry.get(name) {
888                        // Evaluate arguments
889                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
890                            &table,
891                            self.date_notation.clone(),
892                        );
893                        let dummy_row = 0;
894
895                        let mut evaluated_args = Vec::new();
896                        for arg in args {
897                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898                        }
899
900                        // Generate the table
901                        generator.generate(evaluated_args)?
902                    } else {
903                        return Err(anyhow!("Unknown generator function: {}", name));
904                    }
905                }
906            }
907        } else if let Some(ref subquery) = statement.from_subquery {
908            // Execute the subquery and use its result as the source
909            debug!("QueryEngine: Processing FROM subquery...");
910            let subquery_result =
911                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913            // Convert the DataView to a DataTable for use as source
914            // This materializes the subquery result
915            let materialized = self.materialize_view(subquery_result)?;
916            Arc::new(materialized)
917        } else if let Some(ref table_name) = statement.from_table {
918            // Check if this references a CTE
919            if let Some(cte_view) = cte_context.get(table_name) {
920                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921                // Materialize the CTE view as a table
922                let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924                // Apply alias to qualified column names if present
925                if let Some(ref alias) = statement.from_alias {
926                    debug!(
927                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928                        alias, table_name
929                    );
930                    for column in materialized.columns_mut() {
931                        // Replace the CTE name with the alias in qualified names
932                        if let Some(ref qualified_name) = column.qualified_name {
933                            if qualified_name.starts_with(&format!("{}.", table_name)) {
934                                column.qualified_name =
935                                    Some(qualified_name.replace(
936                                        &format!("{}.", table_name),
937                                        &format!("{}.", alias),
938                                    ));
939                            }
940                        }
941                        // Update source table to reflect the alias
942                        if column.source_table.as_ref() == Some(table_name) {
943                            column.source_table = Some(alias.clone());
944                        }
945                    }
946                }
947
948                Arc::new(materialized)
949            } else {
950                // Regular table reference - use the provided table
951                table.clone()
952            }
953        } else {
954            // No FROM clause - use the provided table
955            table.clone()
956        };
957
958        // Register alias in execution context if present
959        if let Some(ref alias) = statement.from_alias {
960            if let Some(ref table_name) = statement.from_table {
961                exec_context.register_alias(alias.clone(), table_name.clone());
962            }
963        }
964
965        // Process JOINs if present
966        let final_table = if !statement.joins.is_empty() {
967            plan.begin_step(
968                StepType::Join,
969                format!("Process {} JOINs", statement.joins.len()),
970            );
971            plan.set_rows_in(source_table.row_count());
972
973            let join_executor = HashJoinExecutor::new(self.case_insensitive);
974            let mut current_table = source_table;
975
976            for (idx, join_clause) in statement.joins.iter().enumerate() {
977                let join_start = Instant::now();
978                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981                plan.add_detail(format!(
982                    "Executing {:?} JOIN on {} condition(s)",
983                    join_clause.join_type,
984                    join_clause.condition.conditions.len()
985                ));
986
987                // Resolve the right table for the join
988                let right_table = match &join_clause.table {
989                    TableSource::Table(name) => {
990                        // Check if it's a CTE reference
991                        if let Some(cte_view) = cte_context.get(name) {
992                            let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994                            // Apply alias to qualified column names if present
995                            if let Some(ref alias) = join_clause.alias {
996                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997                                for column in materialized.columns_mut() {
998                                    // Replace the CTE name with the alias in qualified names
999                                    if let Some(ref qualified_name) = column.qualified_name {
1000                                        if qualified_name.starts_with(&format!("{}.", name)) {
1001                                            column.qualified_name = Some(qualified_name.replace(
1002                                                &format!("{}.", name),
1003                                                &format!("{}.", alias),
1004                                            ));
1005                                        }
1006                                    }
1007                                    // Update source table to reflect the alias
1008                                    if column.source_table.as_ref() == Some(name) {
1009                                        column.source_table = Some(alias.clone());
1010                                    }
1011                                }
1012                            }
1013
1014                            Arc::new(materialized)
1015                        } else {
1016                            // For now, we need the actual table data
1017                            // In a real implementation, this would load from file
1018                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019                        }
1020                    }
1021                    TableSource::DerivedTable { query, alias: _ } => {
1022                        // Execute the subquery
1023                        let subquery_result = self.build_view_with_context(
1024                            table.clone(),
1025                            *query.clone(),
1026                            cte_context,
1027                        )?;
1028                        let materialized = self.materialize_view(subquery_result)?;
1029                        Arc::new(materialized)
1030                    }
1031                };
1032
1033                // Execute the join
1034                let joined = join_executor.execute_join(
1035                    current_table.clone(),
1036                    join_clause,
1037                    right_table.clone(),
1038                )?;
1039
1040                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041                plan.set_rows_out(joined.row_count());
1042                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043                plan.add_detail(format!(
1044                    "Join time: {:.3}ms",
1045                    join_start.elapsed().as_secs_f64() * 1000.0
1046                ));
1047                plan.end_step();
1048
1049                current_table = Arc::new(joined);
1050            }
1051
1052            plan.set_rows_out(current_table.row_count());
1053            plan.add_detail(format!(
1054                "Final result after all joins: {} rows",
1055                current_table.row_count()
1056            ));
1057            plan.end_step();
1058            current_table
1059        } else {
1060            source_table
1061        };
1062
1063        // Continue with the existing build_view logic but using final_table
1064        self.build_view_internal_with_plan_and_exec(
1065            final_table,
1066            statement,
1067            plan,
1068            Some(exec_context),
1069        )
1070    }
1071
1072    /// Materialize a DataView into a new DataTable
1073    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074        let source = view.source();
1075        let mut result_table = DataTable::new("derived");
1076
1077        // Get the visible columns from the view
1078        let visible_cols = view.visible_column_indices().to_vec();
1079
1080        // Copy column definitions
1081        for col_idx in &visible_cols {
1082            let col = &source.columns[*col_idx];
1083            let new_col = DataColumn {
1084                name: col.name.clone(),
1085                data_type: col.data_type.clone(),
1086                nullable: col.nullable,
1087                unique_values: col.unique_values,
1088                null_count: col.null_count,
1089                metadata: col.metadata.clone(),
1090                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1091                source_table: col.source_table.clone(),     // Preserve source table
1092            };
1093            result_table.add_column(new_col);
1094        }
1095
1096        // Copy visible rows
1097        for row_idx in view.visible_row_indices() {
1098            let source_row = &source.rows[*row_idx];
1099            let mut new_row = DataRow { values: Vec::new() };
1100
1101            for col_idx in &visible_cols {
1102                new_row.values.push(source_row.values[*col_idx].clone());
1103            }
1104
1105            result_table.add_row(new_row);
1106        }
1107
1108        Ok(result_table)
1109    }
1110
1111    fn build_view_internal(
1112        &self,
1113        table: Arc<DataTable>,
1114        statement: SelectStatement,
1115    ) -> Result<DataView> {
1116        let mut dummy_plan = ExecutionPlanBuilder::new();
1117        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118    }
1119
1120    fn build_view_internal_with_plan(
1121        &self,
1122        table: Arc<DataTable>,
1123        statement: SelectStatement,
1124        plan: &mut ExecutionPlanBuilder,
1125    ) -> Result<DataView> {
1126        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127    }
1128
1129    fn build_view_internal_with_plan_and_exec(
1130        &self,
1131        table: Arc<DataTable>,
1132        statement: SelectStatement,
1133        plan: &mut ExecutionPlanBuilder,
1134        exec_context: Option<&ExecutionContext>,
1135    ) -> Result<DataView> {
1136        debug!(
1137            "QueryEngine::build_view - select_items: {:?}",
1138            statement.select_items
1139        );
1140        debug!(
1141            "QueryEngine::build_view - where_clause: {:?}",
1142            statement.where_clause
1143        );
1144
1145        // Start with all rows visible
1146        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148        // Apply WHERE clause filtering using recursive evaluator
1149        if let Some(where_clause) = &statement.where_clause {
1150            let total_rows = table.row_count();
1151            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155            plan.set_rows_in(total_rows);
1156            plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158            // Add details about WHERE conditions
1159            for condition in &where_clause.conditions {
1160                plan.add_detail(format!("Condition: {:?}", condition.expr));
1161            }
1162
1163            let filter_start = Instant::now();
1164            // Create an evaluation context for caching compiled regexes
1165            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167            // Create evaluator ONCE before the loop for performance
1168            let mut evaluator = if let Some(exec_ctx) = exec_context {
1169                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1170                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1171            } else {
1172                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1173            };
1174
1175            // Filter visible rows based on WHERE clause
1176            let mut filtered_rows = Vec::new();
1177            for row_idx in visible_rows {
1178                // Only log for first few rows to avoid performance impact
1179                if row_idx < 3 {
1180                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1181                }
1182
1183                match evaluator.evaluate(where_clause, row_idx) {
1184                    Ok(result) => {
1185                        if row_idx < 3 {
1186                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1187                        }
1188                        if result {
1189                            filtered_rows.push(row_idx);
1190                        }
1191                    }
1192                    Err(e) => {
1193                        if row_idx < 3 {
1194                            debug!(
1195                                "QueryEngine: WHERE evaluation error for row {}: {}",
1196                                row_idx, e
1197                            );
1198                        }
1199                        // Propagate WHERE clause errors instead of silently ignoring them
1200                        return Err(e);
1201                    }
1202                }
1203            }
1204
1205            // Log regex cache statistics
1206            let (compilations, cache_hits) = eval_context.get_stats();
1207            if compilations > 0 || cache_hits > 0 {
1208                debug!(
1209                    "LIKE pattern cache: {} compilations, {} cache hits",
1210                    compilations, cache_hits
1211                );
1212            }
1213            visible_rows = filtered_rows;
1214            let filter_duration = filter_start.elapsed();
1215            info!(
1216                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1217                total_rows,
1218                visible_rows.len(),
1219                filter_duration
1220            );
1221
1222            plan.set_rows_out(visible_rows.len());
1223            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1224            plan.add_detail(format!(
1225                "Filter time: {:.3}ms",
1226                filter_duration.as_secs_f64() * 1000.0
1227            ));
1228            plan.end_step();
1229        }
1230
1231        // Create initial DataView with filtered rows
1232        let mut view = DataView::new(table.clone());
1233        view = view.with_rows(visible_rows);
1234
1235        // Handle GROUP BY if present
1236        if let Some(group_by_exprs) = &statement.group_by {
1237            if !group_by_exprs.is_empty() {
1238                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1239
1240                plan.begin_step(
1241                    StepType::GroupBy,
1242                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1243                );
1244                plan.set_rows_in(view.row_count());
1245                plan.add_detail(format!("Input: {} rows", view.row_count()));
1246                for expr in group_by_exprs {
1247                    plan.add_detail(format!("Group by: {:?}", expr));
1248                }
1249
1250                let group_start = Instant::now();
1251                view = self.apply_group_by(
1252                    view,
1253                    group_by_exprs,
1254                    &statement.select_items,
1255                    statement.having.as_ref(),
1256                    plan,
1257                )?;
1258
1259                plan.set_rows_out(view.row_count());
1260                plan.add_detail(format!("Output: {} groups", view.row_count()));
1261                plan.add_detail(format!(
1262                    "Overall time: {:.3}ms",
1263                    group_start.elapsed().as_secs_f64() * 1000.0
1264                ));
1265                plan.end_step();
1266            }
1267        } else {
1268            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1269            if !statement.select_items.is_empty() {
1270                // Check if we have ANY non-star items (not just the first one)
1271                let has_non_star_items = statement
1272                    .select_items
1273                    .iter()
1274                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1275
1276                // Apply select items if:
1277                // 1. We have computed expressions or explicit columns
1278                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1279                if has_non_star_items || statement.select_items.len() > 1 {
1280                    view = self.apply_select_items(
1281                        view,
1282                        &statement.select_items,
1283                        &statement,
1284                        exec_context,
1285                    )?;
1286                }
1287                // If it's just a single star, no projection needed
1288            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1289                debug!("QueryEngine: Using legacy columns path");
1290                // Fallback to legacy column projection for backward compatibility
1291                // Use the current view's source table, not the original table
1292                let source_table = view.source();
1293                let column_indices =
1294                    self.resolve_column_indices(source_table, &statement.columns)?;
1295                view = view.with_columns(column_indices);
1296            }
1297        }
1298
1299        // Apply DISTINCT if specified
1300        if statement.distinct {
1301            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1302            plan.set_rows_in(view.row_count());
1303            plan.add_detail(format!("Input: {} rows", view.row_count()));
1304
1305            let distinct_start = Instant::now();
1306            view = self.apply_distinct(view)?;
1307
1308            plan.set_rows_out(view.row_count());
1309            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1310            plan.add_detail(format!(
1311                "Distinct time: {:.3}ms",
1312                distinct_start.elapsed().as_secs_f64() * 1000.0
1313            ));
1314            plan.end_step();
1315        }
1316
1317        // Apply ORDER BY sorting
1318        if let Some(order_by_columns) = &statement.order_by {
1319            if !order_by_columns.is_empty() {
1320                plan.begin_step(
1321                    StepType::Sort,
1322                    format!("ORDER BY {} columns", order_by_columns.len()),
1323                );
1324                plan.set_rows_in(view.row_count());
1325                for col in order_by_columns {
1326                    // Format the expression (simplified for now - just show column name or "expr")
1327                    let expr_str = match &col.expr {
1328                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1329                        _ => "expr".to_string(),
1330                    };
1331                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1332                }
1333
1334                let sort_start = Instant::now();
1335                view =
1336                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1337
1338                plan.add_detail(format!(
1339                    "Sort time: {:.3}ms",
1340                    sort_start.elapsed().as_secs_f64() * 1000.0
1341                ));
1342                plan.end_step();
1343            }
1344        }
1345
1346        // Apply LIMIT/OFFSET
1347        if let Some(limit) = statement.limit {
1348            let offset = statement.offset.unwrap_or(0);
1349            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1350            plan.set_rows_in(view.row_count());
1351            if offset > 0 {
1352                plan.add_detail(format!("OFFSET: {}", offset));
1353            }
1354            view = view.with_limit(limit, offset);
1355            plan.set_rows_out(view.row_count());
1356            plan.add_detail(format!("Output: {} rows", view.row_count()));
1357            plan.end_step();
1358        }
1359
1360        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1361        if !statement.set_operations.is_empty() {
1362            plan.begin_step(
1363                StepType::SetOperation,
1364                format!("Process {} set operations", statement.set_operations.len()),
1365            );
1366            plan.set_rows_in(view.row_count());
1367
1368            // Materialize the first result set
1369            let mut combined_table = self.materialize_view(view)?;
1370            let first_columns = combined_table.column_names();
1371            let first_column_count = first_columns.len();
1372
1373            // Track if any operation requires deduplication
1374            let mut needs_deduplication = false;
1375
1376            // Process each set operation
1377            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1378                let op_start = Instant::now();
1379                plan.begin_step(
1380                    StepType::SetOperation,
1381                    format!("{:?} operation #{}", operation, idx + 1),
1382                );
1383
1384                // Execute the next SELECT statement
1385                // We need to pass the original table and exec_context for proper resolution
1386                let next_view = if let Some(exec_ctx) = exec_context {
1387                    self.build_view_internal_with_plan_and_exec(
1388                        table.clone(),
1389                        *next_statement.clone(),
1390                        plan,
1391                        Some(exec_ctx),
1392                    )?
1393                } else {
1394                    self.build_view_internal_with_plan(
1395                        table.clone(),
1396                        *next_statement.clone(),
1397                        plan,
1398                    )?
1399                };
1400
1401                // Materialize the next result set
1402                let next_table = self.materialize_view(next_view)?;
1403                let next_columns = next_table.column_names();
1404                let next_column_count = next_columns.len();
1405
1406                // Validate schema compatibility
1407                if first_column_count != next_column_count {
1408                    return Err(anyhow!(
1409                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1410                        first_column_count,
1411                        idx + 2,
1412                        next_column_count
1413                    ));
1414                }
1415
1416                // Warn if column names don't match (but allow it - some SQL dialects do)
1417                for (col_idx, (first_col, next_col)) in
1418                    first_columns.iter().zip(next_columns.iter()).enumerate()
1419                {
1420                    if !first_col.eq_ignore_ascii_case(next_col) {
1421                        debug!(
1422                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1423                            col_idx + 1,
1424                            first_col,
1425                            next_col
1426                        );
1427                    }
1428                }
1429
1430                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1431                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1432
1433                // Perform the set operation
1434                match operation {
1435                    SetOperation::UnionAll => {
1436                        // UNION ALL: Simply concatenate all rows without deduplication
1437                        for row in next_table.rows.iter() {
1438                            combined_table.add_row(row.clone());
1439                        }
1440                        plan.add_detail(format!(
1441                            "Result: {} rows (no deduplication)",
1442                            combined_table.row_count()
1443                        ));
1444                    }
1445                    SetOperation::Union => {
1446                        // UNION: Concatenate all rows first, deduplicate at the end
1447                        for row in next_table.rows.iter() {
1448                            combined_table.add_row(row.clone());
1449                        }
1450                        needs_deduplication = true;
1451                        plan.add_detail(format!(
1452                            "Combined: {} rows (deduplication pending)",
1453                            combined_table.row_count()
1454                        ));
1455                    }
1456                    SetOperation::Intersect => {
1457                        // INTERSECT: Keep only rows that appear in both
1458                        // TODO: Implement intersection logic
1459                        return Err(anyhow!("INTERSECT is not yet implemented"));
1460                    }
1461                    SetOperation::Except => {
1462                        // EXCEPT: Keep only rows from left that don't appear in right
1463                        // TODO: Implement except logic
1464                        return Err(anyhow!("EXCEPT is not yet implemented"));
1465                    }
1466                }
1467
1468                plan.add_detail(format!(
1469                    "Operation time: {:.3}ms",
1470                    op_start.elapsed().as_secs_f64() * 1000.0
1471                ));
1472                plan.set_rows_out(combined_table.row_count());
1473                plan.end_step();
1474            }
1475
1476            plan.set_rows_out(combined_table.row_count());
1477            plan.add_detail(format!(
1478                "Combined result: {} rows after {} operations",
1479                combined_table.row_count(),
1480                statement.set_operations.len()
1481            ));
1482            plan.end_step();
1483
1484            // Create a new view from the combined table
1485            view = DataView::new(Arc::new(combined_table));
1486
1487            // Apply deduplication if any UNION (not UNION ALL) operation was used
1488            if needs_deduplication {
1489                plan.begin_step(
1490                    StepType::Distinct,
1491                    "UNION deduplication - remove duplicate rows".to_string(),
1492                );
1493                plan.set_rows_in(view.row_count());
1494                plan.add_detail(format!("Input: {} rows", view.row_count()));
1495
1496                let distinct_start = Instant::now();
1497                view = self.apply_distinct(view)?;
1498
1499                plan.set_rows_out(view.row_count());
1500                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1501                plan.add_detail(format!(
1502                    "Deduplication time: {:.3}ms",
1503                    distinct_start.elapsed().as_secs_f64() * 1000.0
1504                ));
1505                plan.end_step();
1506            }
1507        }
1508
1509        Ok(view)
1510    }
1511
1512    /// Resolve column names to indices
1513    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1514        let mut indices = Vec::new();
1515        let table_columns = table.column_names();
1516
1517        for col_name in columns {
1518            let index = table_columns
1519                .iter()
1520                .position(|c| c.eq_ignore_ascii_case(col_name))
1521                .ok_or_else(|| {
1522                    let suggestion = self.find_similar_column(table, col_name);
1523                    match suggestion {
1524                        Some(similar) => anyhow::anyhow!(
1525                            "Column '{}' not found. Did you mean '{}'?",
1526                            col_name,
1527                            similar
1528                        ),
1529                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1530                    }
1531                })?;
1532            indices.push(index);
1533        }
1534
1535        Ok(indices)
1536    }
1537
1538    /// Apply SELECT items (columns and computed expressions) to create new view
1539    fn apply_select_items(
1540        &self,
1541        view: DataView,
1542        select_items: &[SelectItem],
1543        _statement: &SelectStatement,
1544        exec_context: Option<&ExecutionContext>,
1545    ) -> Result<DataView> {
1546        debug!(
1547            "QueryEngine::apply_select_items - items: {:?}",
1548            select_items
1549        );
1550        debug!(
1551            "QueryEngine::apply_select_items - input view has {} rows",
1552            view.row_count()
1553        );
1554
1555        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
1556        let has_unnest = select_items.iter().any(|item| match item {
1557            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1558            _ => false,
1559        });
1560
1561        if has_unnest {
1562            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1563            return self.apply_select_with_row_expansion(view, select_items);
1564        }
1565
1566        // Check if this is an aggregate query:
1567        // 1. At least one aggregate function exists
1568        // 2. All other items are either aggregates or constants (aggregate-compatible)
1569        let has_aggregates = select_items.iter().any(|item| match item {
1570            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1571            SelectItem::Column { .. } => false,
1572            SelectItem::Star { .. } => false,
1573            SelectItem::StarExclude { .. } => false,
1574        });
1575
1576        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1577            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1578            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
1579            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
1580            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
1581        });
1582
1583        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1584            // Special handling for aggregate queries with constants (no GROUP BY)
1585            // These should produce exactly one row
1586            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1587            return self.apply_aggregate_select(view, select_items);
1588        }
1589
1590        // Check if we need to create computed columns
1591        let has_computed_expressions = select_items
1592            .iter()
1593            .any(|item| matches!(item, SelectItem::Expression { .. }));
1594
1595        debug!(
1596            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1597            has_computed_expressions
1598        );
1599
1600        if !has_computed_expressions {
1601            // Simple case: only columns, use existing projection logic
1602            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1603            return Ok(view.with_columns(column_indices));
1604        }
1605
1606        // Complex case: we have computed expressions
1607        // IMPORTANT: We create a PROJECTED view, not a new table
1608        // This preserves the original DataTable reference
1609
1610        let source_table = view.source();
1611        let visible_rows = view.visible_row_indices();
1612
1613        // Create a temporary table just for the computed result view
1614        // But this table is only used for the current query result
1615        let mut computed_table = DataTable::new("query_result");
1616
1617        // First, expand any Star selectors to actual columns
1618        let mut expanded_items = Vec::new();
1619        for item in select_items {
1620            match item {
1621                SelectItem::Star { table_prefix, .. } => {
1622                    if let Some(prefix) = table_prefix {
1623                        // Scoped expansion: table.* expands only columns from that table
1624                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
1625                        for col in &source_table.columns {
1626                            if Self::column_matches_table(col, prefix) {
1627                                expanded_items.push(SelectItem::Column {
1628                                    column: ColumnRef::unquoted(col.name.clone()),
1629                                    leading_comments: vec![],
1630                                    trailing_comment: None,
1631                                });
1632                            }
1633                        }
1634                    } else {
1635                        // Unscoped expansion: * expands to all columns
1636                        debug!("QueryEngine::apply_select_items - expanding *");
1637                        for col_name in source_table.column_names() {
1638                            expanded_items.push(SelectItem::Column {
1639                                column: ColumnRef::unquoted(col_name.to_string()),
1640                                leading_comments: vec![],
1641                                trailing_comment: None,
1642                            });
1643                        }
1644                    }
1645                }
1646                _ => expanded_items.push(item.clone()),
1647            }
1648        }
1649
1650        // Add columns based on expanded SelectItems, handling duplicates
1651        let mut column_name_counts: std::collections::HashMap<String, usize> =
1652            std::collections::HashMap::new();
1653
1654        for item in &expanded_items {
1655            let base_name = match item {
1656                SelectItem::Column {
1657                    column: col_ref, ..
1658                } => col_ref.name.clone(),
1659                SelectItem::Expression { alias, .. } => alias.clone(),
1660                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1661                SelectItem::StarExclude { .. } => {
1662                    unreachable!("StarExclude should have been expanded")
1663                }
1664            };
1665
1666            // Check if this column name has been used before
1667            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1668            let column_name = if *count == 0 {
1669                // First occurrence, use the name as-is
1670                base_name.clone()
1671            } else {
1672                // Duplicate, append a suffix
1673                format!("{base_name}_{count}")
1674            };
1675            *count += 1;
1676
1677            computed_table.add_column(DataColumn::new(&column_name));
1678        }
1679
1680        // Calculate values for each row
1681        let mut evaluator =
1682            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1683
1684        // Populate table aliases from exec_context if available
1685        if let Some(exec_ctx) = exec_context {
1686            let aliases = exec_ctx.get_aliases();
1687            if !aliases.is_empty() {
1688                debug!(
1689                    "Applying {} aliases to evaluator: {:?}",
1690                    aliases.len(),
1691                    aliases
1692                );
1693                evaluator = evaluator.with_table_aliases(aliases);
1694            }
1695        }
1696
1697        for &row_idx in visible_rows {
1698            let mut row_values = Vec::new();
1699
1700            for item in &expanded_items {
1701                let value = match item {
1702                    SelectItem::Column {
1703                        column: col_ref, ..
1704                    } => {
1705                        // Use evaluator for column resolution (handles aliases properly)
1706                        match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1707                            Ok(val) => val,
1708                            Err(e) => {
1709                                return Err(anyhow!(
1710                                    "Failed to evaluate column {}: {}",
1711                                    col_ref.to_sql(),
1712                                    e
1713                                ));
1714                            }
1715                        }
1716                    }
1717                    SelectItem::Expression { expr, .. } => {
1718                        // Computed expression
1719                        evaluator.evaluate(&expr, row_idx)?
1720                    }
1721                    SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1722                    SelectItem::StarExclude { .. } => {
1723                        unreachable!("StarExclude should have been expanded")
1724                    }
1725                };
1726                row_values.push(value);
1727            }
1728
1729            computed_table
1730                .add_row(DataRow::new(row_values))
1731                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1732        }
1733
1734        // Return a view of the computed result
1735        // This is a temporary view for this query only
1736        Ok(DataView::new(Arc::new(computed_table)))
1737    }
1738
1739    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
1740    fn apply_select_with_row_expansion(
1741        &self,
1742        view: DataView,
1743        select_items: &[SelectItem],
1744    ) -> Result<DataView> {
1745        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1746
1747        let source_table = view.source();
1748        let visible_rows = view.visible_row_indices();
1749        let expander_registry = RowExpanderRegistry::new();
1750
1751        // Create result table
1752        let mut result_table = DataTable::new("unnest_result");
1753
1754        // Expand * to columns and set up result columns
1755        let mut expanded_items = Vec::new();
1756        for item in select_items {
1757            match item {
1758                SelectItem::Star { table_prefix, .. } => {
1759                    if let Some(prefix) = table_prefix {
1760                        // Scoped expansion: table.* expands only columns from that table
1761                        debug!(
1762                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
1763                            prefix
1764                        );
1765                        for col in &source_table.columns {
1766                            if Self::column_matches_table(col, prefix) {
1767                                expanded_items.push(SelectItem::Column {
1768                                    column: ColumnRef::unquoted(col.name.clone()),
1769                                    leading_comments: vec![],
1770                                    trailing_comment: None,
1771                                });
1772                            }
1773                        }
1774                    } else {
1775                        // Unscoped expansion: * expands to all columns
1776                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
1777                        for col_name in source_table.column_names() {
1778                            expanded_items.push(SelectItem::Column {
1779                                column: ColumnRef::unquoted(col_name.to_string()),
1780                                leading_comments: vec![],
1781                                trailing_comment: None,
1782                            });
1783                        }
1784                    }
1785                }
1786                _ => expanded_items.push(item.clone()),
1787            }
1788        }
1789
1790        // Add columns to result table
1791        for item in &expanded_items {
1792            let column_name = match item {
1793                SelectItem::Column {
1794                    column: col_ref, ..
1795                } => col_ref.name.clone(),
1796                SelectItem::Expression { alias, .. } => alias.clone(),
1797                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1798                SelectItem::StarExclude { .. } => {
1799                    unreachable!("StarExclude should have been expanded")
1800                }
1801            };
1802            result_table.add_column(DataColumn::new(&column_name));
1803        }
1804
1805        // Process each input row
1806        let mut evaluator =
1807            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1808
1809        for &row_idx in visible_rows {
1810            // First pass: identify UNNEST expressions and collect their expansion arrays
1811            let mut unnest_expansions = Vec::new();
1812            let mut unnest_indices = Vec::new();
1813
1814            for (col_idx, item) in expanded_items.iter().enumerate() {
1815                if let SelectItem::Expression { expr, .. } = item {
1816                    if let Some(expansion_result) = self.try_expand_unnest(
1817                        &expr,
1818                        source_table,
1819                        row_idx,
1820                        &mut evaluator,
1821                        &expander_registry,
1822                    )? {
1823                        unnest_expansions.push(expansion_result);
1824                        unnest_indices.push(col_idx);
1825                    }
1826                }
1827            }
1828
1829            // Determine how many output rows to generate
1830            let expansion_count = if unnest_expansions.is_empty() {
1831                1 // No UNNEST, just one row
1832            } else {
1833                unnest_expansions
1834                    .iter()
1835                    .map(|exp| exp.row_count())
1836                    .max()
1837                    .unwrap_or(1)
1838            };
1839
1840            // Generate output rows
1841            for output_idx in 0..expansion_count {
1842                let mut row_values = Vec::new();
1843
1844                for (col_idx, item) in expanded_items.iter().enumerate() {
1845                    // Check if this column is an UNNEST column
1846                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1847
1848                    let value = if let Some(unnest_idx) = unnest_position {
1849                        // Get value from expansion array (or NULL if exhausted)
1850                        let expansion = &unnest_expansions[unnest_idx];
1851                        expansion
1852                            .values
1853                            .get(output_idx)
1854                            .cloned()
1855                            .unwrap_or(DataValue::Null)
1856                    } else {
1857                        // Regular column or non-UNNEST expression - replicate from input
1858                        match item {
1859                            SelectItem::Column {
1860                                column: col_ref, ..
1861                            } => {
1862                                let col_idx =
1863                                    source_table.get_column_index(&col_ref.name).ok_or_else(
1864                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1865                                    )?;
1866                                let row = source_table
1867                                    .get_row(row_idx)
1868                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1869                                row.get(col_idx)
1870                                    .ok_or_else(|| {
1871                                        anyhow::anyhow!("Column {} not found in row", col_idx)
1872                                    })?
1873                                    .clone()
1874                            }
1875                            SelectItem::Expression { expr, .. } => {
1876                                // Non-UNNEST expression - evaluate once and replicate
1877                                evaluator.evaluate(&expr, row_idx)?
1878                            }
1879                            SelectItem::Star { .. } => unreachable!(),
1880                            SelectItem::StarExclude { .. } => {
1881                                unreachable!("StarExclude should have been expanded")
1882                            }
1883                        }
1884                    };
1885
1886                    row_values.push(value);
1887                }
1888
1889                result_table
1890                    .add_row(DataRow::new(row_values))
1891                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1892            }
1893        }
1894
1895        debug!(
1896            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1897            visible_rows.len(),
1898            result_table.row_count()
1899        );
1900
1901        Ok(DataView::new(Arc::new(result_table)))
1902    }
1903
1904    /// Try to expand an expression if it's an UNNEST call
1905    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
1906    fn try_expand_unnest(
1907        &self,
1908        expr: &SqlExpression,
1909        _source_table: &DataTable,
1910        row_idx: usize,
1911        evaluator: &mut ArithmeticEvaluator,
1912        expander_registry: &RowExpanderRegistry,
1913    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1914        // Check for UNNEST variant (direct syntax)
1915        if let SqlExpression::Unnest { column, delimiter } = expr {
1916            // Evaluate the column expression
1917            let column_value = evaluator.evaluate(column, row_idx)?;
1918
1919            // Delimiter is already a string literal
1920            let delimiter_value = DataValue::String(delimiter.clone());
1921
1922            // Get the UNNEST expander
1923            let expander = expander_registry
1924                .get("UNNEST")
1925                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1926
1927            // Expand the value
1928            let expansion = expander.expand(&column_value, &[delimiter_value])?;
1929            return Ok(Some(expansion));
1930        }
1931
1932        // Also check for FunctionCall form (for compatibility)
1933        if let SqlExpression::FunctionCall { name, args, .. } = expr {
1934            if name.to_uppercase() == "UNNEST" {
1935                // UNNEST(column, delimiter)
1936                if args.len() != 2 {
1937                    return Err(anyhow::anyhow!(
1938                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1939                    ));
1940                }
1941
1942                // Evaluate the column expression (first arg)
1943                let column_value = evaluator.evaluate(&args[0], row_idx)?;
1944
1945                // Evaluate the delimiter expression (second arg)
1946                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1947
1948                // Get the UNNEST expander
1949                let expander = expander_registry
1950                    .get("UNNEST")
1951                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1952
1953                // Expand the value
1954                let expansion = expander.expand(&column_value, &[delimiter_value])?;
1955                return Ok(Some(expansion));
1956            }
1957        }
1958
1959        Ok(None)
1960    }
1961
1962    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1963    fn apply_aggregate_select(
1964        &self,
1965        view: DataView,
1966        select_items: &[SelectItem],
1967    ) -> Result<DataView> {
1968        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1969
1970        let source_table = view.source();
1971        let mut result_table = DataTable::new("aggregate_result");
1972
1973        // Add columns for each select item
1974        for item in select_items {
1975            let column_name = match item {
1976                SelectItem::Expression { alias, .. } => alias.clone(),
1977                _ => unreachable!("Should only have expressions in aggregate-only query"),
1978            };
1979            result_table.add_column(DataColumn::new(&column_name));
1980        }
1981
1982        // Create evaluator with visible rows from the view (for filtered aggregates)
1983        let visible_rows = view.visible_row_indices().to_vec();
1984        let mut evaluator =
1985            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1986                .with_visible_rows(visible_rows);
1987
1988        // Evaluate each aggregate expression once (they handle all rows internally)
1989        let mut row_values = Vec::new();
1990        for item in select_items {
1991            match item {
1992                SelectItem::Expression { expr, .. } => {
1993                    // The evaluator will handle aggregates over all rows
1994                    // We pass row_index=0 but aggregates ignore it and process all rows
1995                    let value = evaluator.evaluate(expr, 0)?;
1996                    row_values.push(value);
1997                }
1998                _ => unreachable!("Should only have expressions in aggregate-only query"),
1999            }
2000        }
2001
2002        // Add the single result row
2003        result_table
2004            .add_row(DataRow::new(row_values))
2005            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2006
2007        Ok(DataView::new(Arc::new(result_table)))
2008    }
2009
2010    /// Check if a column belongs to a specific table based on source_table or qualified_name
2011    ///
2012    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2013    /// to filter which columns should be included.
2014    ///
2015    /// # Arguments
2016    /// * `col` - The column to check
2017    /// * `table_name` - The table name or alias to match against
2018    ///
2019    /// # Returns
2020    /// `true` if the column belongs to the specified table
2021    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2022        // First, check the source_table field
2023        if let Some(ref source) = col.source_table {
2024            // Direct match or matches with schema qualification
2025            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2026                return true;
2027            }
2028        }
2029
2030        // Second, check the qualified_name field
2031        if let Some(ref qualified) = col.qualified_name {
2032            // Check if qualified name starts with "table_name."
2033            if qualified.starts_with(&format!("{}.", table_name)) {
2034                return true;
2035            }
2036        }
2037
2038        false
2039    }
2040
2041    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2042    fn resolve_select_columns(
2043        &self,
2044        table: &DataTable,
2045        select_items: &[SelectItem],
2046    ) -> Result<Vec<usize>> {
2047        let mut indices = Vec::new();
2048        let table_columns = table.column_names();
2049
2050        for item in select_items {
2051            match item {
2052                SelectItem::Column {
2053                    column: col_ref, ..
2054                } => {
2055                    // Check if this has a table prefix
2056                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2057                        // For qualified references, ONLY try qualified lookup - no fallback
2058                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2059                        table.find_column_by_qualified_name(&qualified_name)
2060                            .ok_or_else(|| {
2061                                // Check if any columns have qualified names for better error message
2062                                let has_qualified = table.columns.iter()
2063                                    .any(|c| c.qualified_name.is_some());
2064                                if !has_qualified {
2065                                    anyhow::anyhow!(
2066                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2067                                        qualified_name, table_prefix
2068                                    )
2069                                } else {
2070                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2071                                }
2072                            })?
2073                    } else {
2074                        // Simple column name lookup
2075                        table_columns
2076                            .iter()
2077                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2078                            .ok_or_else(|| {
2079                                let suggestion = self.find_similar_column(table, &col_ref.name);
2080                                match suggestion {
2081                                    Some(similar) => anyhow::anyhow!(
2082                                        "Column '{}' not found. Did you mean '{}'?",
2083                                        col_ref.name,
2084                                        similar
2085                                    ),
2086                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2087                                }
2088                            })?
2089                    };
2090                    indices.push(index);
2091                }
2092                SelectItem::Star { table_prefix, .. } => {
2093                    if let Some(prefix) = table_prefix {
2094                        // Scoped expansion: table.* expands only columns from that table
2095                        for (i, col) in table.columns.iter().enumerate() {
2096                            if Self::column_matches_table(col, prefix) {
2097                                indices.push(i);
2098                            }
2099                        }
2100                    } else {
2101                        // Unscoped expansion: * expands to all column indices
2102                        for i in 0..table_columns.len() {
2103                            indices.push(i);
2104                        }
2105                    }
2106                }
2107                SelectItem::StarExclude {
2108                    table_prefix,
2109                    excluded_columns,
2110                    ..
2111                } => {
2112                    // Expand all columns (with optional table prefix), then exclude specified ones
2113                    if let Some(prefix) = table_prefix {
2114                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2115                        for (i, col) in table.columns.iter().enumerate() {
2116                            if Self::column_matches_table(col, prefix)
2117                                && !excluded_columns.contains(&col.name)
2118                            {
2119                                indices.push(i);
2120                            }
2121                        }
2122                    } else {
2123                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2124                        for (i, col_name) in table_columns.iter().enumerate() {
2125                            if !excluded_columns
2126                                .iter()
2127                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2128                            {
2129                                indices.push(i);
2130                            }
2131                        }
2132                    }
2133                }
2134                SelectItem::Expression { .. } => {
2135                    return Err(anyhow::anyhow!(
2136                        "Computed expressions require new table creation"
2137                    ));
2138                }
2139            }
2140        }
2141
2142        Ok(indices)
2143    }
2144
2145    /// Apply DISTINCT to remove duplicate rows
2146    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2147        use std::collections::HashSet;
2148
2149        let source = view.source();
2150        let visible_cols = view.visible_column_indices();
2151        let visible_rows = view.visible_row_indices();
2152
2153        // Build a set to track unique rows
2154        let mut seen_rows = HashSet::new();
2155        let mut unique_row_indices = Vec::new();
2156
2157        for &row_idx in visible_rows {
2158            // Build a key representing this row's visible column values
2159            let mut row_key = Vec::new();
2160            for &col_idx in visible_cols {
2161                let value = source
2162                    .get_value(row_idx, col_idx)
2163                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2164                // Convert value to a hashable representation
2165                row_key.push(format!("{:?}", value));
2166            }
2167
2168            // Check if we've seen this row before
2169            if seen_rows.insert(row_key) {
2170                // First time seeing this row combination
2171                unique_row_indices.push(row_idx);
2172            }
2173        }
2174
2175        // Create a new view with only unique rows
2176        Ok(view.with_rows(unique_row_indices))
2177    }
2178
2179    /// Apply multi-column ORDER BY sorting to the view
2180    fn apply_multi_order_by(
2181        &self,
2182        view: DataView,
2183        order_by_columns: &[OrderByItem],
2184    ) -> Result<DataView> {
2185        self.apply_multi_order_by_with_context(view, order_by_columns, None)
2186    }
2187
2188    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
2189    fn apply_multi_order_by_with_context(
2190        &self,
2191        mut view: DataView,
2192        order_by_columns: &[OrderByItem],
2193        _exec_context: Option<&ExecutionContext>,
2194    ) -> Result<DataView> {
2195        // Build list of (source_column_index, ascending) tuples
2196        let mut sort_columns = Vec::new();
2197
2198        for order_col in order_by_columns {
2199            // Extract column name from expression (currently only supports simple columns)
2200            let column_name = match &order_col.expr {
2201                SqlExpression::Column(col_ref) => col_ref.name.clone(),
2202                _ => {
2203                    // TODO: Support expression evaluation in ORDER BY
2204                    return Err(anyhow!(
2205                        "ORDER BY expressions not yet supported - only simple columns allowed"
2206                    ));
2207                }
2208            };
2209
2210            // Try to find the column index, handling qualified column names (table.column)
2211            let col_index = if column_name.contains('.') {
2212                // Qualified column name - extract unqualified part
2213                if let Some(dot_pos) = column_name.rfind('.') {
2214                    let col_name = &column_name[dot_pos + 1..];
2215
2216                    // After SELECT processing, columns are unqualified
2217                    // So just use the column name part
2218                    debug!(
2219                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
2220                        col_name, column_name
2221                    );
2222                    view.source().get_column_index(col_name)
2223                } else {
2224                    view.source().get_column_index(&column_name)
2225                }
2226            } else {
2227                // Simple column name
2228                view.source().get_column_index(&column_name)
2229            }
2230            .ok_or_else(|| {
2231                // If not found, provide helpful error with suggestions
2232                let suggestion = self.find_similar_column(view.source(), &column_name);
2233                match suggestion {
2234                    Some(similar) => anyhow::anyhow!(
2235                        "Column '{}' not found. Did you mean '{}'?",
2236                        column_name,
2237                        similar
2238                    ),
2239                    None => {
2240                        // Also list available columns for debugging
2241                        let available_cols = view.source().column_names().join(", ");
2242                        anyhow::anyhow!(
2243                            "Column '{}' not found. Available columns: {}",
2244                            column_name,
2245                            available_cols
2246                        )
2247                    }
2248                }
2249            })?;
2250
2251            let ascending = matches!(order_col.direction, SortDirection::Asc);
2252            sort_columns.push((col_index, ascending));
2253        }
2254
2255        // Apply multi-column sorting
2256        view.apply_multi_sort(&sort_columns)?;
2257        Ok(view)
2258    }
2259
2260    /// Apply GROUP BY to the view with optional HAVING clause
2261    fn apply_group_by(
2262        &self,
2263        view: DataView,
2264        group_by_exprs: &[SqlExpression],
2265        select_items: &[SelectItem],
2266        having: Option<&SqlExpression>,
2267        plan: &mut ExecutionPlanBuilder,
2268    ) -> Result<DataView> {
2269        // Use the new expression-based GROUP BY implementation
2270        let (result_view, phase_info) = self.apply_group_by_expressions(
2271            view,
2272            group_by_exprs,
2273            select_items,
2274            having,
2275            self.case_insensitive,
2276            self.date_notation.clone(),
2277        )?;
2278
2279        // Add detailed phase information to the execution plan
2280        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2281        plan.add_detail(format!(
2282            "Phase 1 - Group Building: {:.3}ms",
2283            phase_info.phase2_key_building.as_secs_f64() * 1000.0
2284        ));
2285        plan.add_detail(format!(
2286            "  • Processing {} rows into {} groups",
2287            phase_info.total_rows, phase_info.num_groups
2288        ));
2289        plan.add_detail(format!(
2290            "Phase 2 - Aggregation: {:.3}ms",
2291            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2292        ));
2293        if phase_info.phase4_having_evaluation > Duration::ZERO {
2294            plan.add_detail(format!(
2295                "Phase 3 - HAVING Filter: {:.3}ms",
2296                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2297            ));
2298            plan.add_detail(format!(
2299                "  • Filtered {} groups",
2300                phase_info.groups_filtered_by_having
2301            ));
2302        }
2303        plan.add_detail(format!(
2304            "Total GROUP BY time: {:.3}ms",
2305            phase_info.total_time.as_secs_f64() * 1000.0
2306        ));
2307
2308        Ok(result_view)
2309    }
2310
2311    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
2312    /// This helps pre-size hash tables for better performance
2313    pub fn estimate_group_cardinality(
2314        &self,
2315        view: &DataView,
2316        group_by_exprs: &[SqlExpression],
2317    ) -> usize {
2318        // If we have few rows, just return the row count as upper bound
2319        let row_count = view.get_visible_rows().len();
2320        if row_count <= 100 {
2321            return row_count;
2322        }
2323
2324        // Sample first 1000 rows or 10% of data, whichever is smaller
2325        let sample_size = min(1000, row_count / 10).max(100);
2326        let mut seen = FxHashSet::default();
2327
2328        let visible_rows = view.get_visible_rows();
2329        for (i, &row_idx) in visible_rows.iter().enumerate() {
2330            if i >= sample_size {
2331                break;
2332            }
2333
2334            // Evaluate GROUP BY expressions for this row
2335            let mut key_values = Vec::new();
2336            for expr in group_by_exprs {
2337                let mut evaluator = ArithmeticEvaluator::new(view.source());
2338                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2339                key_values.push(value);
2340            }
2341
2342            seen.insert(key_values);
2343        }
2344
2345        // Estimate total cardinality based on sample
2346        let sample_cardinality = seen.len();
2347        let estimated = (sample_cardinality * row_count) / sample_size;
2348
2349        // Cap at row count and ensure minimum of sample cardinality
2350        estimated.min(row_count).max(sample_cardinality)
2351    }
2352}
2353
2354#[cfg(test)]
2355mod tests {
2356    use super::*;
2357    use crate::data::datatable::{DataColumn, DataRow, DataValue};
2358
2359    fn create_test_table() -> Arc<DataTable> {
2360        let mut table = DataTable::new("test");
2361
2362        // Add columns
2363        table.add_column(DataColumn::new("id"));
2364        table.add_column(DataColumn::new("name"));
2365        table.add_column(DataColumn::new("age"));
2366
2367        // Add rows
2368        table
2369            .add_row(DataRow::new(vec![
2370                DataValue::Integer(1),
2371                DataValue::String("Alice".to_string()),
2372                DataValue::Integer(30),
2373            ]))
2374            .unwrap();
2375
2376        table
2377            .add_row(DataRow::new(vec![
2378                DataValue::Integer(2),
2379                DataValue::String("Bob".to_string()),
2380                DataValue::Integer(25),
2381            ]))
2382            .unwrap();
2383
2384        table
2385            .add_row(DataRow::new(vec![
2386                DataValue::Integer(3),
2387                DataValue::String("Charlie".to_string()),
2388                DataValue::Integer(35),
2389            ]))
2390            .unwrap();
2391
2392        Arc::new(table)
2393    }
2394
2395    #[test]
2396    fn test_select_all() {
2397        let table = create_test_table();
2398        let engine = QueryEngine::new();
2399
2400        let view = engine
2401            .execute(table.clone(), "SELECT * FROM users")
2402            .unwrap();
2403        assert_eq!(view.row_count(), 3);
2404        assert_eq!(view.column_count(), 3);
2405    }
2406
2407    #[test]
2408    fn test_select_columns() {
2409        let table = create_test_table();
2410        let engine = QueryEngine::new();
2411
2412        let view = engine
2413            .execute(table.clone(), "SELECT name, age FROM users")
2414            .unwrap();
2415        assert_eq!(view.row_count(), 3);
2416        assert_eq!(view.column_count(), 2);
2417    }
2418
2419    #[test]
2420    fn test_select_with_limit() {
2421        let table = create_test_table();
2422        let engine = QueryEngine::new();
2423
2424        let view = engine
2425            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2426            .unwrap();
2427        assert_eq!(view.row_count(), 2);
2428    }
2429
2430    #[test]
2431    fn test_type_coercion_contains() {
2432        // Initialize tracing for debug output
2433        let _ = tracing_subscriber::fmt()
2434            .with_max_level(tracing::Level::DEBUG)
2435            .try_init();
2436
2437        let mut table = DataTable::new("test");
2438        table.add_column(DataColumn::new("id"));
2439        table.add_column(DataColumn::new("status"));
2440        table.add_column(DataColumn::new("price"));
2441
2442        // Add test data with mixed types
2443        table
2444            .add_row(DataRow::new(vec![
2445                DataValue::Integer(1),
2446                DataValue::String("Pending".to_string()),
2447                DataValue::Float(99.99),
2448            ]))
2449            .unwrap();
2450
2451        table
2452            .add_row(DataRow::new(vec![
2453                DataValue::Integer(2),
2454                DataValue::String("Confirmed".to_string()),
2455                DataValue::Float(150.50),
2456            ]))
2457            .unwrap();
2458
2459        table
2460            .add_row(DataRow::new(vec![
2461                DataValue::Integer(3),
2462                DataValue::String("Pending".to_string()),
2463                DataValue::Float(75.00),
2464            ]))
2465            .unwrap();
2466
2467        let table = Arc::new(table);
2468        let engine = QueryEngine::new();
2469
2470        println!("\n=== Testing WHERE clause with Contains ===");
2471        println!("Table has {} rows", table.row_count());
2472        for i in 0..table.row_count() {
2473            let status = table.get_value(i, 1);
2474            println!("Row {i}: status = {status:?}");
2475        }
2476
2477        // Test 1: Basic string contains (should work)
2478        println!("\n--- Test 1: status.Contains('pend') ---");
2479        let result = engine.execute(
2480            table.clone(),
2481            "SELECT * FROM test WHERE status.Contains('pend')",
2482        );
2483        match result {
2484            Ok(view) => {
2485                println!("SUCCESS: Found {} matching rows", view.row_count());
2486                assert_eq!(view.row_count(), 2); // Should find both Pending rows
2487            }
2488            Err(e) => {
2489                panic!("Query failed: {e}");
2490            }
2491        }
2492
2493        // Test 2: Numeric contains (should work with type coercion)
2494        println!("\n--- Test 2: price.Contains('9') ---");
2495        let result = engine.execute(
2496            table.clone(),
2497            "SELECT * FROM test WHERE price.Contains('9')",
2498        );
2499        match result {
2500            Ok(view) => {
2501                println!(
2502                    "SUCCESS: Found {} matching rows with price containing '9'",
2503                    view.row_count()
2504                );
2505                // Should find 99.99 row
2506                assert!(view.row_count() >= 1);
2507            }
2508            Err(e) => {
2509                panic!("Numeric coercion query failed: {e}");
2510            }
2511        }
2512
2513        println!("\n=== All tests passed! ===");
2514    }
2515
2516    #[test]
2517    fn test_not_in_clause() {
2518        // Initialize tracing for debug output
2519        let _ = tracing_subscriber::fmt()
2520            .with_max_level(tracing::Level::DEBUG)
2521            .try_init();
2522
2523        let mut table = DataTable::new("test");
2524        table.add_column(DataColumn::new("id"));
2525        table.add_column(DataColumn::new("country"));
2526
2527        // Add test data
2528        table
2529            .add_row(DataRow::new(vec![
2530                DataValue::Integer(1),
2531                DataValue::String("CA".to_string()),
2532            ]))
2533            .unwrap();
2534
2535        table
2536            .add_row(DataRow::new(vec![
2537                DataValue::Integer(2),
2538                DataValue::String("US".to_string()),
2539            ]))
2540            .unwrap();
2541
2542        table
2543            .add_row(DataRow::new(vec![
2544                DataValue::Integer(3),
2545                DataValue::String("UK".to_string()),
2546            ]))
2547            .unwrap();
2548
2549        let table = Arc::new(table);
2550        let engine = QueryEngine::new();
2551
2552        println!("\n=== Testing NOT IN clause ===");
2553        println!("Table has {} rows", table.row_count());
2554        for i in 0..table.row_count() {
2555            let country = table.get_value(i, 1);
2556            println!("Row {i}: country = {country:?}");
2557        }
2558
2559        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
2560        println!("\n--- Test: country NOT IN ('CA') ---");
2561        let result = engine.execute(
2562            table.clone(),
2563            "SELECT * FROM test WHERE country NOT IN ('CA')",
2564        );
2565        match result {
2566            Ok(view) => {
2567                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2568                assert_eq!(view.row_count(), 2); // Should find US and UK
2569            }
2570            Err(e) => {
2571                panic!("NOT IN query failed: {e}");
2572            }
2573        }
2574
2575        println!("\n=== NOT IN test complete! ===");
2576    }
2577
2578    #[test]
2579    fn test_case_insensitive_in_and_not_in() {
2580        // Initialize tracing for debug output
2581        let _ = tracing_subscriber::fmt()
2582            .with_max_level(tracing::Level::DEBUG)
2583            .try_init();
2584
2585        let mut table = DataTable::new("test");
2586        table.add_column(DataColumn::new("id"));
2587        table.add_column(DataColumn::new("country"));
2588
2589        // Add test data with mixed case
2590        table
2591            .add_row(DataRow::new(vec![
2592                DataValue::Integer(1),
2593                DataValue::String("CA".to_string()), // uppercase
2594            ]))
2595            .unwrap();
2596
2597        table
2598            .add_row(DataRow::new(vec![
2599                DataValue::Integer(2),
2600                DataValue::String("us".to_string()), // lowercase
2601            ]))
2602            .unwrap();
2603
2604        table
2605            .add_row(DataRow::new(vec![
2606                DataValue::Integer(3),
2607                DataValue::String("UK".to_string()), // uppercase
2608            ]))
2609            .unwrap();
2610
2611        let table = Arc::new(table);
2612
2613        println!("\n=== Testing Case-Insensitive IN clause ===");
2614        println!("Table has {} rows", table.row_count());
2615        for i in 0..table.row_count() {
2616            let country = table.get_value(i, 1);
2617            println!("Row {i}: country = {country:?}");
2618        }
2619
2620        // Test case-insensitive IN - should match 'CA' with 'ca'
2621        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2622        let engine = QueryEngine::with_case_insensitive(true);
2623        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2624        match result {
2625            Ok(view) => {
2626                println!(
2627                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2628                    view.row_count()
2629                );
2630                assert_eq!(view.row_count(), 1); // Should find CA row
2631            }
2632            Err(e) => {
2633                panic!("Case-insensitive IN query failed: {e}");
2634            }
2635        }
2636
2637        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
2638        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2639        let result = engine.execute(
2640            table.clone(),
2641            "SELECT * FROM test WHERE country NOT IN ('ca')",
2642        );
2643        match result {
2644            Ok(view) => {
2645                println!(
2646                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2647                    view.row_count()
2648                );
2649                assert_eq!(view.row_count(), 2); // Should find us and UK rows
2650            }
2651            Err(e) => {
2652                panic!("Case-insensitive NOT IN query failed: {e}");
2653            }
2654        }
2655
2656        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
2657        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2658        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
2659        let result = engine_case_sensitive
2660            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2661        match result {
2662            Ok(view) => {
2663                println!(
2664                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2665                    view.row_count()
2666                );
2667                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
2668            }
2669            Err(e) => {
2670                panic!("Case-sensitive IN query failed: {e}");
2671            }
2672        }
2673
2674        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2675    }
2676
2677    #[test]
2678    #[ignore = "Parentheses in WHERE clause not yet implemented"]
2679    fn test_parentheses_in_where_clause() {
2680        // Initialize tracing for debug output
2681        let _ = tracing_subscriber::fmt()
2682            .with_max_level(tracing::Level::DEBUG)
2683            .try_init();
2684
2685        let mut table = DataTable::new("test");
2686        table.add_column(DataColumn::new("id"));
2687        table.add_column(DataColumn::new("status"));
2688        table.add_column(DataColumn::new("priority"));
2689
2690        // Add test data
2691        table
2692            .add_row(DataRow::new(vec![
2693                DataValue::Integer(1),
2694                DataValue::String("Pending".to_string()),
2695                DataValue::String("High".to_string()),
2696            ]))
2697            .unwrap();
2698
2699        table
2700            .add_row(DataRow::new(vec![
2701                DataValue::Integer(2),
2702                DataValue::String("Complete".to_string()),
2703                DataValue::String("High".to_string()),
2704            ]))
2705            .unwrap();
2706
2707        table
2708            .add_row(DataRow::new(vec![
2709                DataValue::Integer(3),
2710                DataValue::String("Pending".to_string()),
2711                DataValue::String("Low".to_string()),
2712            ]))
2713            .unwrap();
2714
2715        table
2716            .add_row(DataRow::new(vec![
2717                DataValue::Integer(4),
2718                DataValue::String("Complete".to_string()),
2719                DataValue::String("Low".to_string()),
2720            ]))
2721            .unwrap();
2722
2723        let table = Arc::new(table);
2724        let engine = QueryEngine::new();
2725
2726        println!("\n=== Testing Parentheses in WHERE clause ===");
2727        println!("Table has {} rows", table.row_count());
2728        for i in 0..table.row_count() {
2729            let status = table.get_value(i, 1);
2730            let priority = table.get_value(i, 2);
2731            println!("Row {i}: status = {status:?}, priority = {priority:?}");
2732        }
2733
2734        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
2735        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2736        let result = engine.execute(
2737            table.clone(),
2738            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2739        );
2740        match result {
2741            Ok(view) => {
2742                println!(
2743                    "SUCCESS: Found {} rows with parenthetical logic",
2744                    view.row_count()
2745                );
2746                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
2747            }
2748            Err(e) => {
2749                panic!("Parentheses query failed: {e}");
2750            }
2751        }
2752
2753        println!("\n=== Parentheses test complete! ===");
2754    }
2755
2756    #[test]
2757    #[ignore = "Numeric type coercion needs fixing"]
2758    fn test_numeric_type_coercion() {
2759        // Initialize tracing for debug output
2760        let _ = tracing_subscriber::fmt()
2761            .with_max_level(tracing::Level::DEBUG)
2762            .try_init();
2763
2764        let mut table = DataTable::new("test");
2765        table.add_column(DataColumn::new("id"));
2766        table.add_column(DataColumn::new("price"));
2767        table.add_column(DataColumn::new("quantity"));
2768
2769        // Add test data with different numeric types
2770        table
2771            .add_row(DataRow::new(vec![
2772                DataValue::Integer(1),
2773                DataValue::Float(99.50), // Contains '.'
2774                DataValue::Integer(100),
2775            ]))
2776            .unwrap();
2777
2778        table
2779            .add_row(DataRow::new(vec![
2780                DataValue::Integer(2),
2781                DataValue::Float(150.0), // Contains '.' and '0'
2782                DataValue::Integer(200),
2783            ]))
2784            .unwrap();
2785
2786        table
2787            .add_row(DataRow::new(vec![
2788                DataValue::Integer(3),
2789                DataValue::Integer(75), // No decimal point
2790                DataValue::Integer(50),
2791            ]))
2792            .unwrap();
2793
2794        let table = Arc::new(table);
2795        let engine = QueryEngine::new();
2796
2797        println!("\n=== Testing Numeric Type Coercion ===");
2798        println!("Table has {} rows", table.row_count());
2799        for i in 0..table.row_count() {
2800            let price = table.get_value(i, 1);
2801            let quantity = table.get_value(i, 2);
2802            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2803        }
2804
2805        // Test Contains on float values - should find rows with decimal points
2806        println!("\n--- Test: price.Contains('.') ---");
2807        let result = engine.execute(
2808            table.clone(),
2809            "SELECT * FROM test WHERE price.Contains('.')",
2810        );
2811        match result {
2812            Ok(view) => {
2813                println!(
2814                    "SUCCESS: Found {} rows with decimal points in price",
2815                    view.row_count()
2816                );
2817                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
2818            }
2819            Err(e) => {
2820                panic!("Numeric Contains query failed: {e}");
2821            }
2822        }
2823
2824        // Test Contains on integer values converted to string
2825        println!("\n--- Test: quantity.Contains('0') ---");
2826        let result = engine.execute(
2827            table.clone(),
2828            "SELECT * FROM test WHERE quantity.Contains('0')",
2829        );
2830        match result {
2831            Ok(view) => {
2832                println!(
2833                    "SUCCESS: Found {} rows with '0' in quantity",
2834                    view.row_count()
2835                );
2836                assert_eq!(view.row_count(), 2); // Should find 100 and 200
2837            }
2838            Err(e) => {
2839                panic!("Integer Contains query failed: {e}");
2840            }
2841        }
2842
2843        println!("\n=== Numeric type coercion test complete! ===");
2844    }
2845
2846    #[test]
2847    fn test_datetime_comparisons() {
2848        // Initialize tracing for debug output
2849        let _ = tracing_subscriber::fmt()
2850            .with_max_level(tracing::Level::DEBUG)
2851            .try_init();
2852
2853        let mut table = DataTable::new("test");
2854        table.add_column(DataColumn::new("id"));
2855        table.add_column(DataColumn::new("created_date"));
2856
2857        // Add test data with date strings (as they would come from CSV)
2858        table
2859            .add_row(DataRow::new(vec![
2860                DataValue::Integer(1),
2861                DataValue::String("2024-12-15".to_string()),
2862            ]))
2863            .unwrap();
2864
2865        table
2866            .add_row(DataRow::new(vec![
2867                DataValue::Integer(2),
2868                DataValue::String("2025-01-15".to_string()),
2869            ]))
2870            .unwrap();
2871
2872        table
2873            .add_row(DataRow::new(vec![
2874                DataValue::Integer(3),
2875                DataValue::String("2025-02-15".to_string()),
2876            ]))
2877            .unwrap();
2878
2879        let table = Arc::new(table);
2880        let engine = QueryEngine::new();
2881
2882        println!("\n=== Testing DateTime Comparisons ===");
2883        println!("Table has {} rows", table.row_count());
2884        for i in 0..table.row_count() {
2885            let date = table.get_value(i, 1);
2886            println!("Row {i}: created_date = {date:?}");
2887        }
2888
2889        // Test DateTime constructor comparison - should find dates after 2025-01-01
2890        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2891        let result = engine.execute(
2892            table.clone(),
2893            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2894        );
2895        match result {
2896            Ok(view) => {
2897                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2898                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2899            }
2900            Err(e) => {
2901                panic!("DateTime comparison query failed: {e}");
2902            }
2903        }
2904
2905        println!("\n=== DateTime comparison test complete! ===");
2906    }
2907
2908    #[test]
2909    fn test_not_with_method_calls() {
2910        // Initialize tracing for debug output
2911        let _ = tracing_subscriber::fmt()
2912            .with_max_level(tracing::Level::DEBUG)
2913            .try_init();
2914
2915        let mut table = DataTable::new("test");
2916        table.add_column(DataColumn::new("id"));
2917        table.add_column(DataColumn::new("status"));
2918
2919        // Add test data
2920        table
2921            .add_row(DataRow::new(vec![
2922                DataValue::Integer(1),
2923                DataValue::String("Pending Review".to_string()),
2924            ]))
2925            .unwrap();
2926
2927        table
2928            .add_row(DataRow::new(vec![
2929                DataValue::Integer(2),
2930                DataValue::String("Complete".to_string()),
2931            ]))
2932            .unwrap();
2933
2934        table
2935            .add_row(DataRow::new(vec![
2936                DataValue::Integer(3),
2937                DataValue::String("Pending Approval".to_string()),
2938            ]))
2939            .unwrap();
2940
2941        let table = Arc::new(table);
2942        let engine = QueryEngine::with_case_insensitive(true);
2943
2944        println!("\n=== Testing NOT with Method Calls ===");
2945        println!("Table has {} rows", table.row_count());
2946        for i in 0..table.row_count() {
2947            let status = table.get_value(i, 1);
2948            println!("Row {i}: status = {status:?}");
2949        }
2950
2951        // Test NOT with Contains - should exclude rows containing "pend"
2952        println!("\n--- Test: NOT status.Contains('pend') ---");
2953        let result = engine.execute(
2954            table.clone(),
2955            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2956        );
2957        match result {
2958            Ok(view) => {
2959                println!(
2960                    "SUCCESS: Found {} rows NOT containing 'pend'",
2961                    view.row_count()
2962                );
2963                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2964            }
2965            Err(e) => {
2966                panic!("NOT Contains query failed: {e}");
2967            }
2968        }
2969
2970        // Test NOT with StartsWith
2971        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2972        let result = engine.execute(
2973            table.clone(),
2974            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2975        );
2976        match result {
2977            Ok(view) => {
2978                println!(
2979                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2980                    view.row_count()
2981                );
2982                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2983            }
2984            Err(e) => {
2985                panic!("NOT StartsWith query failed: {e}");
2986            }
2987        }
2988
2989        println!("\n=== NOT with method calls test complete! ===");
2990    }
2991
2992    #[test]
2993    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2994    fn test_complex_logical_expressions() {
2995        // Initialize tracing for debug output
2996        let _ = tracing_subscriber::fmt()
2997            .with_max_level(tracing::Level::DEBUG)
2998            .try_init();
2999
3000        let mut table = DataTable::new("test");
3001        table.add_column(DataColumn::new("id"));
3002        table.add_column(DataColumn::new("status"));
3003        table.add_column(DataColumn::new("priority"));
3004        table.add_column(DataColumn::new("assigned"));
3005
3006        // Add comprehensive test data
3007        table
3008            .add_row(DataRow::new(vec![
3009                DataValue::Integer(1),
3010                DataValue::String("Pending".to_string()),
3011                DataValue::String("High".to_string()),
3012                DataValue::String("John".to_string()),
3013            ]))
3014            .unwrap();
3015
3016        table
3017            .add_row(DataRow::new(vec![
3018                DataValue::Integer(2),
3019                DataValue::String("Complete".to_string()),
3020                DataValue::String("High".to_string()),
3021                DataValue::String("Jane".to_string()),
3022            ]))
3023            .unwrap();
3024
3025        table
3026            .add_row(DataRow::new(vec![
3027                DataValue::Integer(3),
3028                DataValue::String("Pending".to_string()),
3029                DataValue::String("Low".to_string()),
3030                DataValue::String("John".to_string()),
3031            ]))
3032            .unwrap();
3033
3034        table
3035            .add_row(DataRow::new(vec![
3036                DataValue::Integer(4),
3037                DataValue::String("In Progress".to_string()),
3038                DataValue::String("Medium".to_string()),
3039                DataValue::String("Jane".to_string()),
3040            ]))
3041            .unwrap();
3042
3043        let table = Arc::new(table);
3044        let engine = QueryEngine::new();
3045
3046        println!("\n=== Testing Complex Logical Expressions ===");
3047        println!("Table has {} rows", table.row_count());
3048        for i in 0..table.row_count() {
3049            let status = table.get_value(i, 1);
3050            let priority = table.get_value(i, 2);
3051            let assigned = table.get_value(i, 3);
3052            println!(
3053                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3054            );
3055        }
3056
3057        // Test complex AND/OR logic
3058        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3059        let result = engine.execute(
3060            table.clone(),
3061            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3062        );
3063        match result {
3064            Ok(view) => {
3065                println!(
3066                    "SUCCESS: Found {} rows with complex logic",
3067                    view.row_count()
3068                );
3069                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3070            }
3071            Err(e) => {
3072                panic!("Complex logic query failed: {e}");
3073            }
3074        }
3075
3076        // Test NOT with complex expressions
3077        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3078        let result = engine.execute(
3079            table.clone(),
3080            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3081        );
3082        match result {
3083            Ok(view) => {
3084                println!(
3085                    "SUCCESS: Found {} rows with NOT complex logic",
3086                    view.row_count()
3087                );
3088                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3089            }
3090            Err(e) => {
3091                panic!("NOT complex logic query failed: {e}");
3092            }
3093        }
3094
3095        println!("\n=== Complex logical expressions test complete! ===");
3096    }
3097
3098    #[test]
3099    fn test_mixed_data_types_and_edge_cases() {
3100        // Initialize tracing for debug output
3101        let _ = tracing_subscriber::fmt()
3102            .with_max_level(tracing::Level::DEBUG)
3103            .try_init();
3104
3105        let mut table = DataTable::new("test");
3106        table.add_column(DataColumn::new("id"));
3107        table.add_column(DataColumn::new("value"));
3108        table.add_column(DataColumn::new("nullable_field"));
3109
3110        // Add test data with mixed types and edge cases
3111        table
3112            .add_row(DataRow::new(vec![
3113                DataValue::Integer(1),
3114                DataValue::String("123.45".to_string()),
3115                DataValue::String("present".to_string()),
3116            ]))
3117            .unwrap();
3118
3119        table
3120            .add_row(DataRow::new(vec![
3121                DataValue::Integer(2),
3122                DataValue::Float(678.90),
3123                DataValue::Null,
3124            ]))
3125            .unwrap();
3126
3127        table
3128            .add_row(DataRow::new(vec![
3129                DataValue::Integer(3),
3130                DataValue::Boolean(true),
3131                DataValue::String("also present".to_string()),
3132            ]))
3133            .unwrap();
3134
3135        table
3136            .add_row(DataRow::new(vec![
3137                DataValue::Integer(4),
3138                DataValue::String("false".to_string()),
3139                DataValue::Null,
3140            ]))
3141            .unwrap();
3142
3143        let table = Arc::new(table);
3144        let engine = QueryEngine::new();
3145
3146        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3147        println!("Table has {} rows", table.row_count());
3148        for i in 0..table.row_count() {
3149            let value = table.get_value(i, 1);
3150            let nullable = table.get_value(i, 2);
3151            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3152        }
3153
3154        // Test type coercion with boolean Contains
3155        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3156        let result = engine.execute(
3157            table.clone(),
3158            "SELECT * FROM test WHERE value.Contains('true')",
3159        );
3160        match result {
3161            Ok(view) => {
3162                println!(
3163                    "SUCCESS: Found {} rows with boolean coercion",
3164                    view.row_count()
3165                );
3166                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3167            }
3168            Err(e) => {
3169                panic!("Boolean coercion query failed: {e}");
3170            }
3171        }
3172
3173        // Test multiple IN values with mixed types
3174        println!("\n--- Test: id IN (1, 3) ---");
3175        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3176        match result {
3177            Ok(view) => {
3178                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3179                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
3180            }
3181            Err(e) => {
3182                panic!("Multiple IN values query failed: {e}");
3183            }
3184        }
3185
3186        println!("\n=== Mixed data types test complete! ===");
3187    }
3188
3189    /// Test that aggregate-only queries return exactly one row (regression test)
3190    #[test]
3191    fn test_aggregate_only_single_row() {
3192        let table = create_test_stock_data();
3193        let engine = QueryEngine::new();
3194
3195        // Test query with multiple aggregates - should return exactly 1 row
3196        let result = engine
3197            .execute(
3198                table.clone(),
3199                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3200            )
3201            .expect("Query should succeed");
3202
3203        assert_eq!(
3204            result.row_count(),
3205            1,
3206            "Aggregate-only query should return exactly 1 row"
3207        );
3208        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3209
3210        // Verify the actual values are correct
3211        let source = result.source();
3212        let row = source.get_row(0).expect("Should have first row");
3213
3214        // COUNT(*) should be 5 (total rows)
3215        assert_eq!(row.values[0], DataValue::Integer(5));
3216
3217        // MIN should be 99.5
3218        assert_eq!(row.values[1], DataValue::Float(99.5));
3219
3220        // MAX should be 105.0
3221        assert_eq!(row.values[2], DataValue::Float(105.0));
3222
3223        // AVG should be approximately 102.4
3224        if let DataValue::Float(avg) = &row.values[3] {
3225            assert!(
3226                (avg - 102.4).abs() < 0.01,
3227                "Average should be approximately 102.4, got {}",
3228                avg
3229            );
3230        } else {
3231            panic!("AVG should return a Float value");
3232        }
3233    }
3234
3235    /// Test single aggregate function returns single row
3236    #[test]
3237    fn test_single_aggregate_single_row() {
3238        let table = create_test_stock_data();
3239        let engine = QueryEngine::new();
3240
3241        let result = engine
3242            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3243            .expect("Query should succeed");
3244
3245        assert_eq!(
3246            result.row_count(),
3247            1,
3248            "Single aggregate query should return exactly 1 row"
3249        );
3250        assert_eq!(result.column_count(), 1, "Should have 1 column");
3251
3252        let source = result.source();
3253        let row = source.get_row(0).expect("Should have first row");
3254        assert_eq!(row.values[0], DataValue::Integer(5));
3255    }
3256
3257    /// Test aggregate with WHERE clause filtering
3258    #[test]
3259    fn test_aggregate_with_where_single_row() {
3260        let table = create_test_stock_data();
3261        let engine = QueryEngine::new();
3262
3263        // Filter to only high-value stocks (>= 103.0) and aggregate
3264        let result = engine
3265            .execute(
3266                table.clone(),
3267                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3268            )
3269            .expect("Query should succeed");
3270
3271        assert_eq!(
3272            result.row_count(),
3273            1,
3274            "Filtered aggregate query should return exactly 1 row"
3275        );
3276        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3277
3278        let source = result.source();
3279        let row = source.get_row(0).expect("Should have first row");
3280
3281        // Should find 2 rows (103.5 and 105.0)
3282        assert_eq!(row.values[0], DataValue::Integer(2));
3283        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
3284        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
3285    }
3286
3287    #[test]
3288    fn test_not_in_parsing() {
3289        use crate::sql::recursive_parser::Parser;
3290
3291        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3292        println!("\n=== Testing NOT IN parsing ===");
3293        println!("Parsing query: {query}");
3294
3295        let mut parser = Parser::new(query);
3296        match parser.parse() {
3297            Ok(statement) => {
3298                println!("Parsed statement: {statement:#?}");
3299                if let Some(where_clause) = statement.where_clause {
3300                    println!("WHERE conditions: {:#?}", where_clause.conditions);
3301                    if let Some(first_condition) = where_clause.conditions.first() {
3302                        println!("First condition expression: {:#?}", first_condition.expr);
3303                    }
3304                }
3305            }
3306            Err(e) => {
3307                panic!("Parse error: {e}");
3308            }
3309        }
3310    }
3311
3312    /// Create test stock data for aggregate testing
3313    fn create_test_stock_data() -> Arc<DataTable> {
3314        let mut table = DataTable::new("stock");
3315
3316        table.add_column(DataColumn::new("symbol"));
3317        table.add_column(DataColumn::new("close"));
3318        table.add_column(DataColumn::new("volume"));
3319
3320        // Add 5 rows of test data
3321        let test_data = vec![
3322            ("AAPL", 99.5, 1000),
3323            ("AAPL", 101.2, 1500),
3324            ("AAPL", 103.5, 2000),
3325            ("AAPL", 105.0, 1200),
3326            ("AAPL", 102.8, 1800),
3327        ];
3328
3329        for (symbol, close, volume) in test_data {
3330            table
3331                .add_row(DataRow::new(vec![
3332                    DataValue::String(symbol.to_string()),
3333                    DataValue::Float(close),
3334                    DataValue::Integer(volume),
3335                ]))
3336                .expect("Should add row successfully");
3337        }
3338
3339        Arc::new(table)
3340    }
3341}
3342
3343#[cfg(test)]
3344#[path = "query_engine_tests.rs"]
3345mod query_engine_tests;