Skip to main content

sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Execution context for tracking table aliases and scope during query execution
33#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35    /// Map from alias to actual table/CTE name
36    /// Example: "t" -> "#tmp_trades", "a" -> "data"
37    alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41    /// Create a new empty execution context
42    pub fn new() -> Self {
43        Self {
44            alias_map: HashMap::new(),
45        }
46    }
47
48    /// Register a table alias
49    pub fn register_alias(&mut self, alias: String, table_name: String) {
50        debug!("Registering alias: {} -> {}", alias, table_name);
51        self.alias_map.insert(alias, table_name);
52    }
53
54    /// Resolve an alias to its actual table name
55    /// Returns the alias itself if not found in the map
56    pub fn resolve_alias(&self, name: &str) -> String {
57        self.alias_map
58            .get(name)
59            .cloned()
60            .unwrap_or_else(|| name.to_string())
61    }
62
63    /// Check if a name is a registered alias
64    pub fn is_alias(&self, name: &str) -> bool {
65        self.alias_map.contains_key(name)
66    }
67
68    /// Get a copy of all registered aliases
69    pub fn get_aliases(&self) -> HashMap<String, String> {
70        self.alias_map.clone()
71    }
72
73    /// Resolve a column reference to its index in the table, handling aliases
74    ///
75    /// This is the unified column resolution function that should be used by all
76    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
77    /// alias resolution behavior.
78    ///
79    /// Resolution strategy:
80    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
81    ///    a. Resolve the alias: t -> actual_table_name
82    ///    b. Try qualified lookup: "actual_table_name.amount"
83    ///    c. Fall back to unqualified: "amount"
84    /// 2. If column_ref has no prefix:
85    ///    a. Try simple column name lookup: "amount"
86    ///    b. Try as qualified name if it contains a dot: "table.column"
87    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88        if let Some(table_prefix) = &column_ref.table_prefix {
89            // Qualified column reference: resolve the alias first
90            let actual_table = self.resolve_alias(table_prefix);
91
92            // Try qualified lookup: "actual_table.column"
93            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95                debug!(
96                    "Resolved {}.{} -> qualified column '{}' at index {}",
97                    table_prefix, column_ref.name, qualified_name, idx
98                );
99                return Ok(idx);
100            }
101
102            // Fall back to unqualified lookup
103            if let Some(idx) = table.get_column_index(&column_ref.name) {
104                debug!(
105                    "Resolved {}.{} -> unqualified column '{}' at index {}",
106                    table_prefix, column_ref.name, column_ref.name, idx
107                );
108                return Ok(idx);
109            }
110
111            // Not found with either qualified or unqualified name
112            Err(anyhow!(
113                "Column '{}' not found. Table '{}' may not support qualified column names",
114                qualified_name,
115                actual_table
116            ))
117        } else {
118            // Unqualified column reference
119            if let Some(idx) = table.get_column_index(&column_ref.name) {
120                debug!(
121                    "Resolved unqualified column '{}' at index {}",
122                    column_ref.name, idx
123                );
124                return Ok(idx);
125            }
126
127            // If the column name contains a dot, try it as a qualified name
128            if column_ref.name.contains('.') {
129                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130                    debug!(
131                        "Resolved '{}' as qualified column at index {}",
132                        column_ref.name, idx
133                    );
134                    return Ok(idx);
135                }
136            }
137
138            // Column not found - provide helpful error
139            let suggestion = self.find_similar_column(table, &column_ref.name);
140            match suggestion {
141                Some(similar) => Err(anyhow!(
142                    "Column '{}' not found. Did you mean '{}'?",
143                    column_ref.name,
144                    similar
145                )),
146                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147            }
148        }
149    }
150
151    /// Find a similar column name using edit distance (for better error messages)
152    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153        let columns = table.column_names();
154        let mut best_match: Option<(String, usize)> = None;
155
156        for col in columns {
157            let distance = edit_distance(name, &col);
158            if distance <= 2 {
159                // Allow up to 2 character differences
160                match best_match {
161                    Some((_, best_dist)) if distance < best_dist => {
162                        best_match = Some((col.clone(), distance));
163                    }
164                    None => {
165                        best_match = Some((col.clone(), distance));
166                    }
167                    _ => {}
168                }
169            }
170        }
171
172        best_match.map(|(name, _)| name)
173    }
174}
175
176impl Default for ExecutionContext {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Calculate edit distance between two strings (Levenshtein distance)
183fn edit_distance(a: &str, b: &str) -> usize {
184    let len_a = a.chars().count();
185    let len_b = b.chars().count();
186
187    if len_a == 0 {
188        return len_b;
189    }
190    if len_b == 0 {
191        return len_a;
192    }
193
194    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196    for i in 0..=len_a {
197        matrix[i][0] = i;
198    }
199    for j in 0..=len_b {
200        matrix[0][j] = j;
201    }
202
203    let a_chars: Vec<char> = a.chars().collect();
204    let b_chars: Vec<char> = b.chars().collect();
205
206    for i in 1..=len_a {
207        for j in 1..=len_b {
208            let cost = if a_chars[i - 1] == b_chars[j - 1] {
209                0
210            } else {
211                1
212            };
213            matrix[i][j] = min(
214                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215                matrix[i - 1][j - 1] + cost,
216            );
217        }
218    }
219
220    matrix[len_a][len_b]
221}
222
223/// Query engine that executes SQL directly on `DataTable`
224#[derive(Clone)]
225pub struct QueryEngine {
226    case_insensitive: bool,
227    date_notation: String,
228    _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl QueryEngine {
238    #[must_use]
239    pub fn new() -> Self {
240        Self {
241            case_insensitive: false,
242            date_notation: get_date_notation(),
243            _behavior_config: None,
244        }
245    }
246
247    #[must_use]
248    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249        let case_insensitive = config.case_insensitive_default;
250        // Use get_date_notation() to respect environment variable override
251        let date_notation = get_date_notation();
252        Self {
253            case_insensitive,
254            date_notation,
255            _behavior_config: Some(config),
256        }
257    }
258
259    #[must_use]
260    pub fn with_date_notation(_date_notation: String) -> Self {
261        Self {
262            case_insensitive: false,
263            date_notation: get_date_notation(), // Always use the global function
264            _behavior_config: None,
265        }
266    }
267
268    #[must_use]
269    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270        Self {
271            case_insensitive,
272            date_notation: get_date_notation(),
273            _behavior_config: None,
274        }
275    }
276
277    #[must_use]
278    pub fn with_case_insensitive_and_date_notation(
279        case_insensitive: bool,
280        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
281    ) -> Self {
282        Self {
283            case_insensitive,
284            date_notation: get_date_notation(), // Always use the global function
285            _behavior_config: None,
286        }
287    }
288
289    /// Find a column name similar to the given name using edit distance
290    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291        let columns = table.column_names();
292        let mut best_match: Option<(String, usize)> = None;
293
294        for col in columns {
295            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296            // Only suggest if distance is small (likely a typo)
297            // Allow up to 3 edits for longer names
298            let max_distance = if name.len() > 10 { 3 } else { 2 };
299            if distance <= max_distance {
300                match &best_match {
301                    None => best_match = Some((col, distance)),
302                    Some((_, best_dist)) if distance < *best_dist => {
303                        best_match = Some((col, distance));
304                    }
305                    _ => {}
306                }
307            }
308        }
309
310        best_match.map(|(name, _)| name)
311    }
312
313    /// Calculate Levenshtein edit distance between two strings
314    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315        let len1 = s1.len();
316        let len2 = s2.len();
317        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319        for i in 0..=len1 {
320            matrix[i][0] = i;
321        }
322        for j in 0..=len2 {
323            matrix[0][j] = j;
324        }
325
326        for (i, c1) in s1.chars().enumerate() {
327            for (j, c2) in s2.chars().enumerate() {
328                let cost = usize::from(c1 != c2);
329                matrix[i + 1][j + 1] = std::cmp::min(
330                    matrix[i][j + 1] + 1, // deletion
331                    std::cmp::min(
332                        matrix[i + 1][j] + 1, // insertion
333                        matrix[i][j] + cost,  // substitution
334                    ),
335                );
336            }
337        }
338
339        matrix[len1][len2]
340    }
341
342    /// Check if an expression contains UNNEST function call
343    fn contains_unnest(expr: &SqlExpression) -> bool {
344        match expr {
345            // Direct UNNEST variant
346            SqlExpression::Unnest { .. } => true,
347            SqlExpression::FunctionCall { name, args, .. } => {
348                if name.to_uppercase() == "UNNEST" {
349                    return true;
350                }
351                // Check recursively in function arguments
352                args.iter().any(Self::contains_unnest)
353            }
354            SqlExpression::BinaryOp { left, right, .. } => {
355                Self::contains_unnest(left) || Self::contains_unnest(right)
356            }
357            SqlExpression::Not { expr } => Self::contains_unnest(expr),
358            SqlExpression::CaseExpression {
359                when_branches,
360                else_branch,
361            } => {
362                when_branches.iter().any(|branch| {
363                    Self::contains_unnest(&branch.condition)
364                        || Self::contains_unnest(&branch.result)
365                }) || else_branch
366                    .as_ref()
367                    .map_or(false, |e| Self::contains_unnest(e))
368            }
369            SqlExpression::SimpleCaseExpression {
370                expr,
371                when_branches,
372                else_branch,
373            } => {
374                Self::contains_unnest(expr)
375                    || when_branches.iter().any(|branch| {
376                        Self::contains_unnest(&branch.value)
377                            || Self::contains_unnest(&branch.result)
378                    })
379                    || else_branch
380                        .as_ref()
381                        .map_or(false, |e| Self::contains_unnest(e))
382            }
383            SqlExpression::InList { expr, values } => {
384                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385            }
386            SqlExpression::NotInList { expr, values } => {
387                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388            }
389            SqlExpression::Between { expr, lower, upper } => {
390                Self::contains_unnest(expr)
391                    || Self::contains_unnest(lower)
392                    || Self::contains_unnest(upper)
393            }
394            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
397            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399            SqlExpression::ChainedMethodCall { base, args, .. } => {
400                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401            }
402            _ => false,
403        }
404    }
405
406    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
407    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408        match expr {
409            SqlExpression::WindowFunction {
410                window_spec, args, ..
411            } => {
412                // Add this window spec
413                specs.push(window_spec.clone());
414                // Recursively check arguments
415                for arg in args {
416                    Self::collect_window_specs(arg, specs);
417                }
418            }
419            SqlExpression::BinaryOp { left, right, .. } => {
420                Self::collect_window_specs(left, specs);
421                Self::collect_window_specs(right, specs);
422            }
423            SqlExpression::Not { expr } => {
424                Self::collect_window_specs(expr, specs);
425            }
426            SqlExpression::FunctionCall { args, .. } => {
427                for arg in args {
428                    Self::collect_window_specs(arg, specs);
429                }
430            }
431            SqlExpression::CaseExpression {
432                when_branches,
433                else_branch,
434            } => {
435                for branch in when_branches {
436                    Self::collect_window_specs(&branch.condition, specs);
437                    Self::collect_window_specs(&branch.result, specs);
438                }
439                if let Some(else_expr) = else_branch {
440                    Self::collect_window_specs(else_expr, specs);
441                }
442            }
443            SqlExpression::SimpleCaseExpression {
444                expr,
445                when_branches,
446                else_branch,
447            } => {
448                Self::collect_window_specs(expr, specs);
449                for branch in when_branches {
450                    Self::collect_window_specs(&branch.value, specs);
451                    Self::collect_window_specs(&branch.result, specs);
452                }
453                if let Some(else_expr) = else_branch {
454                    Self::collect_window_specs(else_expr, specs);
455                }
456            }
457            SqlExpression::InList { expr, values, .. } => {
458                Self::collect_window_specs(expr, specs);
459                for item in values {
460                    Self::collect_window_specs(item, specs);
461                }
462            }
463            SqlExpression::ChainedMethodCall { base, args, .. } => {
464                Self::collect_window_specs(base, specs);
465                for arg in args {
466                    Self::collect_window_specs(arg, specs);
467                }
468            }
469            // Leaf nodes - no recursion needed
470            SqlExpression::Column(_)
471            | SqlExpression::NumberLiteral(_)
472            | SqlExpression::StringLiteral(_)
473            | SqlExpression::BooleanLiteral(_)
474            | SqlExpression::Null
475            | SqlExpression::DateTimeToday { .. }
476            | SqlExpression::DateTimeConstructor { .. }
477            | SqlExpression::MethodCall { .. } => {}
478            // Catch-all for any other variants
479            _ => {}
480        }
481    }
482
483    /// Check if an expression contains a window function
484    fn contains_window_function(expr: &SqlExpression) -> bool {
485        match expr {
486            SqlExpression::WindowFunction { .. } => true,
487            SqlExpression::BinaryOp { left, right, .. } => {
488                Self::contains_window_function(left) || Self::contains_window_function(right)
489            }
490            SqlExpression::Not { expr } => Self::contains_window_function(expr),
491            SqlExpression::FunctionCall { args, .. } => {
492                args.iter().any(Self::contains_window_function)
493            }
494            SqlExpression::CaseExpression {
495                when_branches,
496                else_branch,
497            } => {
498                when_branches.iter().any(|branch| {
499                    Self::contains_window_function(&branch.condition)
500                        || Self::contains_window_function(&branch.result)
501                }) || else_branch
502                    .as_ref()
503                    .map_or(false, |e| Self::contains_window_function(e))
504            }
505            SqlExpression::SimpleCaseExpression {
506                expr,
507                when_branches,
508                else_branch,
509            } => {
510                Self::contains_window_function(expr)
511                    || when_branches.iter().any(|branch| {
512                        Self::contains_window_function(&branch.value)
513                            || Self::contains_window_function(&branch.result)
514                    })
515                    || else_branch
516                        .as_ref()
517                        .map_or(false, |e| Self::contains_window_function(e))
518            }
519            SqlExpression::InList { expr, values } => {
520                Self::contains_window_function(expr)
521                    || values.iter().any(Self::contains_window_function)
522            }
523            SqlExpression::NotInList { expr, values } => {
524                Self::contains_window_function(expr)
525                    || values.iter().any(Self::contains_window_function)
526            }
527            SqlExpression::Between { expr, lower, upper } => {
528                Self::contains_window_function(expr)
529                    || Self::contains_window_function(lower)
530                    || Self::contains_window_function(upper)
531            }
532            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534            SqlExpression::MethodCall { args, .. } => {
535                args.iter().any(Self::contains_window_function)
536            }
537            SqlExpression::ChainedMethodCall { base, args, .. } => {
538                Self::contains_window_function(base)
539                    || args.iter().any(Self::contains_window_function)
540            }
541            _ => false,
542        }
543    }
544
545    /// Extract all window function specifications from select items
546    fn extract_window_specs(
547        items: &[SelectItem],
548    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549        let mut specs = Vec::new();
550        for (idx, item) in items.iter().enumerate() {
551            if let SelectItem::Expression { expr, .. } = item {
552                Self::collect_window_function_specs(expr, idx, &mut specs);
553            }
554        }
555        specs
556    }
557
558    /// Recursively collect window function specs from an expression
559    fn collect_window_function_specs(
560        expr: &SqlExpression,
561        output_column_index: usize,
562        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563    ) {
564        match expr {
565            SqlExpression::WindowFunction {
566                name,
567                args,
568                window_spec,
569            } => {
570                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571                    spec: window_spec.clone(),
572                    function_name: name.clone(),
573                    args: args.clone(),
574                    output_column_index,
575                });
576            }
577            SqlExpression::BinaryOp { left, right, .. } => {
578                Self::collect_window_function_specs(left, output_column_index, specs);
579                Self::collect_window_function_specs(right, output_column_index, specs);
580            }
581            SqlExpression::Not { expr } => {
582                Self::collect_window_function_specs(expr, output_column_index, specs);
583            }
584            SqlExpression::FunctionCall { args, .. } => {
585                for arg in args {
586                    Self::collect_window_function_specs(arg, output_column_index, specs);
587                }
588            }
589            SqlExpression::CaseExpression {
590                when_branches,
591                else_branch,
592            } => {
593                for branch in when_branches {
594                    Self::collect_window_function_specs(
595                        &branch.condition,
596                        output_column_index,
597                        specs,
598                    );
599                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600                }
601                if let Some(e) = else_branch {
602                    Self::collect_window_function_specs(e, output_column_index, specs);
603                }
604            }
605            SqlExpression::SimpleCaseExpression {
606                expr,
607                when_branches,
608                else_branch,
609            } => {
610                Self::collect_window_function_specs(expr, output_column_index, specs);
611                for branch in when_branches {
612                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614                }
615                if let Some(e) = else_branch {
616                    Self::collect_window_function_specs(e, output_column_index, specs);
617                }
618            }
619            SqlExpression::InList { expr, values } => {
620                Self::collect_window_function_specs(expr, output_column_index, specs);
621                for val in values {
622                    Self::collect_window_function_specs(val, output_column_index, specs);
623                }
624            }
625            SqlExpression::NotInList { expr, values } => {
626                Self::collect_window_function_specs(expr, output_column_index, specs);
627                for val in values {
628                    Self::collect_window_function_specs(val, output_column_index, specs);
629                }
630            }
631            SqlExpression::Between { expr, lower, upper } => {
632                Self::collect_window_function_specs(expr, output_column_index, specs);
633                Self::collect_window_function_specs(lower, output_column_index, specs);
634                Self::collect_window_function_specs(upper, output_column_index, specs);
635            }
636            SqlExpression::InSubquery { expr, .. } => {
637                Self::collect_window_function_specs(expr, output_column_index, specs);
638            }
639            SqlExpression::NotInSubquery { expr, .. } => {
640                Self::collect_window_function_specs(expr, output_column_index, specs);
641            }
642            SqlExpression::MethodCall { args, .. } => {
643                for arg in args {
644                    Self::collect_window_function_specs(arg, output_column_index, specs);
645                }
646            }
647            SqlExpression::ChainedMethodCall { base, args, .. } => {
648                Self::collect_window_function_specs(base, output_column_index, specs);
649                for arg in args {
650                    Self::collect_window_function_specs(arg, output_column_index, specs);
651                }
652            }
653            _ => {} // Other expression types don't contain window functions
654        }
655    }
656
657    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
658    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659        let (view, _plan) = self.execute_with_plan(table, sql)?;
660        Ok(view)
661    }
662
663    /// Execute a SQL query with optional temp table registry access
664    pub fn execute_with_temp_tables(
665        &self,
666        table: Arc<DataTable>,
667        sql: &str,
668        temp_tables: Option<&TempTableRegistry>,
669    ) -> Result<DataView> {
670        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671        Ok(view)
672    }
673
674    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
675    pub fn execute_statement(
676        &self,
677        table: Arc<DataTable>,
678        statement: SelectStatement,
679    ) -> Result<DataView> {
680        self.execute_statement_with_temp_tables(table, statement, None)
681    }
682
683    /// Execute a parsed SelectStatement with optional temp table access
684    pub fn execute_statement_with_temp_tables(
685        &self,
686        table: Arc<DataTable>,
687        statement: SelectStatement,
688        temp_tables: Option<&TempTableRegistry>,
689    ) -> Result<DataView> {
690        // First process CTEs to build context
691        let mut cte_context = HashMap::new();
692
693        // Add temp tables to CTE context if provided
694        if let Some(temp_registry) = temp_tables {
695            for table_name in temp_registry.list_tables() {
696                if let Some(temp_table) = temp_registry.get(&table_name) {
697                    debug!("Adding temp table {} to CTE context", table_name);
698                    let view = DataView::new(temp_table);
699                    cte_context.insert(table_name, Arc::new(view));
700                }
701            }
702        }
703
704        for cte in &statement.ctes {
705            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706            // Execute the CTE based on its type
707            let cte_result = match &cte.cte_type {
708                CTEType::Standard(query) => {
709                    // Execute the CTE query (it might reference earlier CTEs)
710                    let view = self.build_view_with_context(
711                        table.clone(),
712                        query.clone(),
713                        &mut cte_context,
714                    )?;
715
716                    // Materialize the view and enrich columns with qualified names
717                    let mut materialized = self.materialize_view(view)?;
718
719                    // Enrich columns with qualified names for proper scoping
720                    for column in materialized.columns_mut() {
721                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722                        column.source_table = Some(cte.name.clone());
723                    }
724
725                    DataView::new(Arc::new(materialized))
726                }
727                CTEType::Web(web_spec) => {
728                    // Fetch data from URL
729                    use crate::web::http_fetcher::WebDataFetcher;
730
731                    let fetcher = WebDataFetcher::new()?;
732                    // Pass None for query context (no full SQL available in these contexts)
733                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735                    // Enrich columns with qualified names for proper scoping
736                    for column in data_table.columns_mut() {
737                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738                        column.source_table = Some(cte.name.clone());
739                    }
740
741                    // Convert DataTable to DataView
742                    DataView::new(Arc::new(data_table))
743                }
744                CTEType::File(_file_spec) => {
745                    // TODO: Phase 1 walker not yet implemented — see docs/FILE_CTE_DESIGN.md
746                    return Err(anyhow!(
747                        "FILE CTE execution not yet implemented (parser-only milestone)"
748                    ));
749                }
750            };
751            // Store the result in the context for later use
752            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
753            debug!(
754                "QueryEngine: CTE '{}' pre-processed, stored in context",
755                cte.name
756            );
757        }
758
759        // Now process subqueries with CTE context available
760        let mut subquery_executor =
761            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
762        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
763
764        // Build the view with the same CTE context
765        self.build_view_with_context(table, processed_statement, &mut cte_context)
766    }
767
768    /// Execute a statement with provided CTE context (for subqueries)
769    pub fn execute_statement_with_cte_context(
770        &self,
771        table: Arc<DataTable>,
772        statement: SelectStatement,
773        cte_context: &HashMap<String, Arc<DataView>>,
774    ) -> Result<DataView> {
775        // Clone the context so we can add any CTEs from this statement
776        let mut local_context = cte_context.clone();
777
778        // Process any CTEs in this statement (they might be nested)
779        for cte in &statement.ctes {
780            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
781            let cte_result = match &cte.cte_type {
782                CTEType::Standard(query) => {
783                    let view = self.build_view_with_context(
784                        table.clone(),
785                        query.clone(),
786                        &mut local_context,
787                    )?;
788
789                    // Materialize the view and enrich columns with qualified names
790                    let mut materialized = self.materialize_view(view)?;
791
792                    // Enrich columns with qualified names for proper scoping
793                    for column in materialized.columns_mut() {
794                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
795                        column.source_table = Some(cte.name.clone());
796                    }
797
798                    DataView::new(Arc::new(materialized))
799                }
800                CTEType::Web(web_spec) => {
801                    // Fetch data from URL
802                    use crate::web::http_fetcher::WebDataFetcher;
803
804                    let fetcher = WebDataFetcher::new()?;
805                    // Pass None for query context (no full SQL available in these contexts)
806                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
807
808                    // Enrich columns with qualified names for proper scoping
809                    for column in data_table.columns_mut() {
810                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
811                        column.source_table = Some(cte.name.clone());
812                    }
813
814                    // Convert DataTable to DataView
815                    DataView::new(Arc::new(data_table))
816                }
817                CTEType::File(_file_spec) => {
818                    // TODO: Phase 1 walker not yet implemented — see docs/FILE_CTE_DESIGN.md
819                    return Err(anyhow!(
820                        "FILE CTE execution not yet implemented (parser-only milestone)"
821                    ));
822                }
823            };
824            local_context.insert(cte.name.clone(), Arc::new(cte_result));
825        }
826
827        // Process subqueries with the complete context
828        let mut subquery_executor =
829            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
830        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
831
832        // Build the view
833        self.build_view_with_context(table, processed_statement, &mut local_context)
834    }
835
836    /// Execute a query and return both the result and the execution plan
837    pub fn execute_with_plan(
838        &self,
839        table: Arc<DataTable>,
840        sql: &str,
841    ) -> Result<(DataView, ExecutionPlan)> {
842        self.execute_with_plan_and_temp_tables(table, sql, None)
843    }
844
845    /// Execute a query with temp tables and return both the result and the execution plan
846    pub fn execute_with_plan_and_temp_tables(
847        &self,
848        table: Arc<DataTable>,
849        sql: &str,
850        temp_tables: Option<&TempTableRegistry>,
851    ) -> Result<(DataView, ExecutionPlan)> {
852        let mut plan_builder = ExecutionPlanBuilder::new();
853        let start_time = Instant::now();
854
855        // Parse the SQL query
856        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
857        plan_builder.add_detail(format!("Query: {}", sql));
858        let mut parser = Parser::new(sql);
859        let statement = parser
860            .parse()
861            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
862        plan_builder.add_detail(format!("Parsed successfully"));
863        if let Some(ref from_source) = statement.from_source {
864            match from_source {
865                TableSource::Table(name) => {
866                    plan_builder.add_detail(format!("FROM: {}", name));
867                }
868                TableSource::DerivedTable { alias, .. } => {
869                    plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
870                }
871                TableSource::Pivot { .. } => {
872                    plan_builder.add_detail("FROM: PIVOT".to_string());
873                }
874            }
875        }
876        if statement.where_clause.is_some() {
877            plan_builder.add_detail("WHERE clause present".to_string());
878        }
879        plan_builder.end_step();
880
881        // First process CTEs to build context
882        let mut cte_context = HashMap::new();
883
884        // Add temp tables to CTE context if provided
885        if let Some(temp_registry) = temp_tables {
886            for table_name in temp_registry.list_tables() {
887                if let Some(temp_table) = temp_registry.get(&table_name) {
888                    debug!("Adding temp table {} to CTE context", table_name);
889                    let view = DataView::new(temp_table);
890                    cte_context.insert(table_name, Arc::new(view));
891                }
892            }
893        }
894
895        if !statement.ctes.is_empty() {
896            plan_builder.begin_step(
897                StepType::CTE,
898                format!("Process {} CTEs", statement.ctes.len()),
899            );
900
901            for cte in &statement.ctes {
902                let cte_start = Instant::now();
903                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
904
905                let cte_result = match &cte.cte_type {
906                    CTEType::Standard(query) => {
907                        // Add CTE query details
908                        if let Some(ref from_source) = query.from_source {
909                            match from_source {
910                                TableSource::Table(name) => {
911                                    plan_builder.add_detail(format!("Source: {}", name));
912                                }
913                                TableSource::DerivedTable { alias, .. } => {
914                                    plan_builder
915                                        .add_detail(format!("Source: derived table ({})", alias));
916                                }
917                                TableSource::Pivot { .. } => {
918                                    plan_builder.add_detail("Source: PIVOT".to_string());
919                                }
920                            }
921                        }
922                        if query.where_clause.is_some() {
923                            plan_builder.add_detail("Has WHERE clause".to_string());
924                        }
925                        if query.group_by.is_some() {
926                            plan_builder.add_detail("Has GROUP BY".to_string());
927                        }
928
929                        debug!(
930                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
931                            cte.name,
932                            cte_context.keys().collect::<Vec<_>>()
933                        );
934
935                        // Process subqueries in the CTE's query FIRST
936                        // This allows the subqueries to see all previously defined CTEs
937                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
938                            self.clone(),
939                            table.clone(),
940                            cte_context.clone(),
941                        );
942                        let processed_query = subquery_executor.execute_subqueries(query)?;
943
944                        let view = self.build_view_with_context(
945                            table.clone(),
946                            processed_query,
947                            &mut cte_context,
948                        )?;
949
950                        // Materialize the view and enrich columns with qualified names
951                        let mut materialized = self.materialize_view(view)?;
952
953                        // Enrich columns with qualified names for proper scoping
954                        for column in materialized.columns_mut() {
955                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
956                            column.source_table = Some(cte.name.clone());
957                        }
958
959                        DataView::new(Arc::new(materialized))
960                    }
961                    CTEType::Web(web_spec) => {
962                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
963                        if let Some(format) = &web_spec.format {
964                            plan_builder.add_detail(format!("Format: {:?}", format));
965                        }
966                        if let Some(cache) = web_spec.cache_seconds {
967                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
968                        }
969
970                        // Fetch data from URL
971                        use crate::web::http_fetcher::WebDataFetcher;
972
973                        let fetcher = WebDataFetcher::new()?;
974                        // Pass None for query context - each WEB CTE is independent
975                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
976
977                        // Enrich columns with qualified names for proper scoping
978                        for column in data_table.columns_mut() {
979                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
980                            column.source_table = Some(cte.name.clone());
981                        }
982
983                        // Convert DataTable to DataView
984                        DataView::new(Arc::new(data_table))
985                    }
986                    CTEType::File(file_spec) => {
987                        plan_builder.add_detail(format!("PATH: {}", file_spec.path));
988                        if file_spec.recursive {
989                            plan_builder.add_detail("RECURSIVE".to_string());
990                        }
991                        if let Some(ref g) = file_spec.glob {
992                            plan_builder.add_detail(format!("GLOB: {}", g));
993                        }
994                        if let Some(d) = file_spec.max_depth {
995                            plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
996                        }
997                        // TODO: Phase 1 walker not yet implemented
998                        return Err(anyhow!(
999                            "FILE CTE execution not yet implemented (parser-only milestone)"
1000                        ));
1001                    }
1002                };
1003
1004                // Record CTE statistics
1005                plan_builder.set_rows_out(cte_result.row_count());
1006                plan_builder.add_detail(format!(
1007                    "Result: {} rows, {} columns",
1008                    cte_result.row_count(),
1009                    cte_result.column_count()
1010                ));
1011                plan_builder.add_detail(format!(
1012                    "Execution time: {:.3}ms",
1013                    cte_start.elapsed().as_secs_f64() * 1000.0
1014                ));
1015
1016                debug!(
1017                    "QueryEngine: Storing CTE '{}' in context with {} rows",
1018                    cte.name,
1019                    cte_result.row_count()
1020                );
1021                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1022                plan_builder.end_step();
1023            }
1024
1025            plan_builder.add_detail(format!(
1026                "All {} CTEs cached in context",
1027                statement.ctes.len()
1028            ));
1029            plan_builder.end_step();
1030        }
1031
1032        // Process subqueries in the statement with CTE context
1033        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1034        let mut subquery_executor =
1035            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1036
1037        // Check if there are subqueries to process
1038        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1039            // This is a simplified check - in reality we'd need to walk the AST
1040            format!("{:?}", w).contains("Subquery")
1041        });
1042
1043        if has_subqueries {
1044            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1045        }
1046
1047        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1048
1049        if has_subqueries {
1050            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1051        } else {
1052            plan_builder.add_detail("No subqueries to process".to_string());
1053        }
1054
1055        plan_builder.end_step();
1056        let result = self.build_view_with_context_and_plan(
1057            table,
1058            processed_statement,
1059            &mut cte_context,
1060            &mut plan_builder,
1061        )?;
1062
1063        let total_duration = start_time.elapsed();
1064        info!(
1065            "Query execution complete: total={:?}, rows={}",
1066            total_duration,
1067            result.row_count()
1068        );
1069
1070        let plan = plan_builder.build();
1071        Ok((result, plan))
1072    }
1073
1074    /// Build a `DataView` from a parsed SQL statement
1075    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1076        let mut cte_context = HashMap::new();
1077        self.build_view_with_context(table, statement, &mut cte_context)
1078    }
1079
1080    /// Build a DataView from a SelectStatement with CTE context
1081    fn build_view_with_context(
1082        &self,
1083        table: Arc<DataTable>,
1084        statement: SelectStatement,
1085        cte_context: &mut HashMap<String, Arc<DataView>>,
1086    ) -> Result<DataView> {
1087        let mut dummy_plan = ExecutionPlanBuilder::new();
1088        let mut exec_context = ExecutionContext::new();
1089        self.build_view_with_context_and_plan_and_exec(
1090            table,
1091            statement,
1092            cte_context,
1093            &mut dummy_plan,
1094            &mut exec_context,
1095        )
1096    }
1097
1098    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1099    fn build_view_with_context_and_plan(
1100        &self,
1101        table: Arc<DataTable>,
1102        statement: SelectStatement,
1103        cte_context: &mut HashMap<String, Arc<DataView>>,
1104        plan: &mut ExecutionPlanBuilder,
1105    ) -> Result<DataView> {
1106        let mut exec_context = ExecutionContext::new();
1107        self.build_view_with_context_and_plan_and_exec(
1108            table,
1109            statement,
1110            cte_context,
1111            plan,
1112            &mut exec_context,
1113        )
1114    }
1115
1116    /// Build a DataView with CTE context, execution plan, and alias resolution context
1117    fn build_view_with_context_and_plan_and_exec(
1118        &self,
1119        table: Arc<DataTable>,
1120        statement: SelectStatement,
1121        cte_context: &mut HashMap<String, Arc<DataView>>,
1122        plan: &mut ExecutionPlanBuilder,
1123        exec_context: &mut ExecutionContext,
1124    ) -> Result<DataView> {
1125        // First, process any CTEs that aren't already in the context
1126        for cte in &statement.ctes {
1127            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1128            if cte_context.contains_key(&cte.name) {
1129                debug!(
1130                    "QueryEngine: CTE '{}' already in context, skipping",
1131                    cte.name
1132                );
1133                continue;
1134            }
1135
1136            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1137            debug!(
1138                "QueryEngine: Available CTEs for '{}': {:?}",
1139                cte.name,
1140                cte_context.keys().collect::<Vec<_>>()
1141            );
1142
1143            // Execute the CTE query (it might reference earlier CTEs)
1144            let cte_result = match &cte.cte_type {
1145                CTEType::Standard(query) => {
1146                    let view =
1147                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1148
1149                    // Materialize the view and enrich columns with qualified names
1150                    let mut materialized = self.materialize_view(view)?;
1151
1152                    // Enrich columns with qualified names for proper scoping
1153                    for column in materialized.columns_mut() {
1154                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1155                        column.source_table = Some(cte.name.clone());
1156                    }
1157
1158                    DataView::new(Arc::new(materialized))
1159                }
1160                CTEType::Web(_web_spec) => {
1161                    // Web CTEs should have been processed earlier in execute_select
1162                    return Err(anyhow!(
1163                        "Web CTEs should be processed in execute_select method"
1164                    ));
1165                }
1166                CTEType::File(_file_spec) => {
1167                    // FILE CTEs (like WEB) should be processed earlier in execute_select
1168                    return Err(anyhow!(
1169                        "FILE CTEs should be processed in execute_select method"
1170                    ));
1171                }
1172            };
1173
1174            // Store the result in the context for later use
1175            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1176            debug!(
1177                "QueryEngine: CTE '{}' processed, stored in context",
1178                cte.name
1179            );
1180        }
1181
1182        // Determine the source table for the main query
1183        let source_table = if let Some(ref from_source) = statement.from_source {
1184            match from_source {
1185                TableSource::Table(table_name) => {
1186                    // Check if this references a CTE
1187                    if let Some(cte_view) = cte_context.get(table_name) {
1188                        debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1189                        // Materialize the CTE view as a table
1190                        let mut materialized = self.materialize_view((**cte_view).clone())?;
1191
1192                        // Apply alias to qualified column names if present
1193                        #[allow(deprecated)]
1194                        if let Some(ref alias) = statement.from_alias {
1195                            debug!(
1196                                "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1197                                alias, table_name
1198                            );
1199                            for column in materialized.columns_mut() {
1200                                // Replace the CTE name with the alias in qualified names
1201                                if let Some(ref qualified_name) = column.qualified_name {
1202                                    if qualified_name.starts_with(&format!("{}.", table_name)) {
1203                                        column.qualified_name = Some(qualified_name.replace(
1204                                            &format!("{}.", table_name),
1205                                            &format!("{}.", alias),
1206                                        ));
1207                                    }
1208                                }
1209                                // Update source table to reflect the alias
1210                                if column.source_table.as_ref() == Some(table_name) {
1211                                    column.source_table = Some(alias.clone());
1212                                }
1213                            }
1214                        }
1215
1216                        Arc::new(materialized)
1217                    } else {
1218                        // Regular table reference - use the provided table
1219                        table.clone()
1220                    }
1221                }
1222                TableSource::DerivedTable { query, alias } => {
1223                    // Execute the subquery and use its result as the source
1224                    debug!(
1225                        "QueryEngine: Processing FROM derived table (alias: {})",
1226                        alias
1227                    );
1228                    let subquery_result =
1229                        self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1230
1231                    // Convert the DataView to a DataTable for use as source
1232                    // This materializes the subquery result
1233                    let mut materialized = self.materialize_view(subquery_result)?;
1234
1235                    // Apply the alias to all columns in the derived table
1236                    // Note: We set source_table but keep the column names unqualified
1237                    // so they can be referenced without the table prefix
1238                    for column in materialized.columns_mut() {
1239                        column.source_table = Some(alias.clone());
1240                    }
1241
1242                    Arc::new(materialized)
1243                }
1244                TableSource::Pivot { .. } => {
1245                    // PIVOT should have been expanded by PivotExpander transformer
1246                    return Err(anyhow!(
1247                        "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1248                    ));
1249                }
1250            }
1251        } else {
1252            // Fallback to deprecated fields for backward compatibility
1253            #[allow(deprecated)]
1254            if let Some(ref table_func) = statement.from_function {
1255                // Handle table functions like RANGE()
1256                debug!("QueryEngine: Processing table function (deprecated field)...");
1257                match table_func {
1258                    TableFunction::Generator { name, args } => {
1259                        // Use the generator registry to create the table
1260                        use crate::sql::generators::GeneratorRegistry;
1261
1262                        // Create generator registry (could be cached in QueryEngine)
1263                        let registry = GeneratorRegistry::new();
1264
1265                        if let Some(generator) = registry.get(name) {
1266                            // Evaluate arguments
1267                            let mut evaluator = ArithmeticEvaluator::with_date_notation(
1268                                &table,
1269                                self.date_notation.clone(),
1270                            );
1271                            let dummy_row = 0;
1272
1273                            let mut evaluated_args = Vec::new();
1274                            for arg in args {
1275                                evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1276                            }
1277
1278                            // Generate the table
1279                            generator.generate(evaluated_args)?
1280                        } else {
1281                            return Err(anyhow!("Unknown generator function: {}", name));
1282                        }
1283                    }
1284                }
1285            } else {
1286                #[allow(deprecated)]
1287                if let Some(ref subquery) = statement.from_subquery {
1288                    // Execute the subquery and use its result as the source
1289                    debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1290                    let subquery_result = self.build_view_with_context(
1291                        table.clone(),
1292                        *subquery.clone(),
1293                        cte_context,
1294                    )?;
1295
1296                    // Convert the DataView to a DataTable for use as source
1297                    // This materializes the subquery result
1298                    let materialized = self.materialize_view(subquery_result)?;
1299                    Arc::new(materialized)
1300                } else {
1301                    #[allow(deprecated)]
1302                    if let Some(ref table_name) = statement.from_table {
1303                        // Check if this references a CTE
1304                        if let Some(cte_view) = cte_context.get(table_name) {
1305                            debug!(
1306                                "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1307                                table_name
1308                            );
1309                            // Materialize the CTE view as a table
1310                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1311
1312                            // Apply alias to qualified column names if present
1313                            #[allow(deprecated)]
1314                            if let Some(ref alias) = statement.from_alias {
1315                                debug!(
1316                                    "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1317                                    alias, table_name
1318                                );
1319                                for column in materialized.columns_mut() {
1320                                    // Replace the CTE name with the alias in qualified names
1321                                    if let Some(ref qualified_name) = column.qualified_name {
1322                                        if qualified_name.starts_with(&format!("{}.", table_name)) {
1323                                            column.qualified_name = Some(qualified_name.replace(
1324                                                &format!("{}.", table_name),
1325                                                &format!("{}.", alias),
1326                                            ));
1327                                        }
1328                                    }
1329                                    // Update source table to reflect the alias
1330                                    if column.source_table.as_ref() == Some(table_name) {
1331                                        column.source_table = Some(alias.clone());
1332                                    }
1333                                }
1334                            }
1335
1336                            Arc::new(materialized)
1337                        } else {
1338                            // Regular table reference - use the provided table
1339                            table.clone()
1340                        }
1341                    } else {
1342                        // No FROM clause - use the provided table
1343                        table.clone()
1344                    }
1345                }
1346            }
1347        };
1348
1349        // Register alias in execution context if present
1350        #[allow(deprecated)]
1351        if let Some(ref alias) = statement.from_alias {
1352            #[allow(deprecated)]
1353            if let Some(ref table_name) = statement.from_table {
1354                exec_context.register_alias(alias.clone(), table_name.clone());
1355            }
1356        }
1357
1358        // Process JOINs if present
1359        let final_table = if !statement.joins.is_empty() {
1360            plan.begin_step(
1361                StepType::Join,
1362                format!("Process {} JOINs", statement.joins.len()),
1363            );
1364            plan.set_rows_in(source_table.row_count());
1365
1366            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1367            let mut current_table = source_table;
1368
1369            for (idx, join_clause) in statement.joins.iter().enumerate() {
1370                let join_start = Instant::now();
1371                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1372                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1373                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1374                plan.add_detail(format!(
1375                    "Executing {:?} JOIN on {} condition(s)",
1376                    join_clause.join_type,
1377                    join_clause.condition.conditions.len()
1378                ));
1379
1380                // Resolve the right table for the join
1381                let right_table = match &join_clause.table {
1382                    TableSource::Table(name) => {
1383                        // Check if it's a CTE reference
1384                        if let Some(cte_view) = cte_context.get(name) {
1385                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1386
1387                            // Apply alias to qualified column names if present
1388                            if let Some(ref alias) = join_clause.alias {
1389                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1390                                for column in materialized.columns_mut() {
1391                                    // Replace the CTE name with the alias in qualified names
1392                                    if let Some(ref qualified_name) = column.qualified_name {
1393                                        if qualified_name.starts_with(&format!("{}.", name)) {
1394                                            column.qualified_name = Some(qualified_name.replace(
1395                                                &format!("{}.", name),
1396                                                &format!("{}.", alias),
1397                                            ));
1398                                        }
1399                                    }
1400                                    // Update source table to reflect the alias
1401                                    if column.source_table.as_ref() == Some(name) {
1402                                        column.source_table = Some(alias.clone());
1403                                    }
1404                                }
1405                            }
1406
1407                            Arc::new(materialized)
1408                        } else {
1409                            // For now, we need the actual table data
1410                            // In a real implementation, this would load from file
1411                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1412                        }
1413                    }
1414                    TableSource::DerivedTable { query, alias: _ } => {
1415                        // Execute the subquery
1416                        let subquery_result = self.build_view_with_context(
1417                            table.clone(),
1418                            *query.clone(),
1419                            cte_context,
1420                        )?;
1421                        let materialized = self.materialize_view(subquery_result)?;
1422                        Arc::new(materialized)
1423                    }
1424                    TableSource::Pivot { .. } => {
1425                        // PIVOT in JOIN is not supported yet (will be handled by transformer)
1426                        return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1427                    }
1428                };
1429
1430                // Execute the join
1431                let joined = join_executor.execute_join(
1432                    current_table.clone(),
1433                    join_clause,
1434                    right_table.clone(),
1435                )?;
1436
1437                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1438                plan.set_rows_out(joined.row_count());
1439                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1440                plan.add_detail(format!(
1441                    "Join time: {:.3}ms",
1442                    join_start.elapsed().as_secs_f64() * 1000.0
1443                ));
1444                plan.end_step();
1445
1446                current_table = Arc::new(joined);
1447            }
1448
1449            plan.set_rows_out(current_table.row_count());
1450            plan.add_detail(format!(
1451                "Final result after all joins: {} rows",
1452                current_table.row_count()
1453            ));
1454            plan.end_step();
1455            current_table
1456        } else {
1457            source_table
1458        };
1459
1460        // Continue with the existing build_view logic but using final_table
1461        self.build_view_internal_with_plan_and_exec(
1462            final_table,
1463            statement,
1464            plan,
1465            Some(exec_context),
1466        )
1467    }
1468
1469    /// Materialize a DataView into a new DataTable
1470    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1471        let source = view.source();
1472        let mut result_table = DataTable::new("derived");
1473
1474        // Get the visible columns from the view
1475        let visible_cols = view.visible_column_indices().to_vec();
1476
1477        // Copy column definitions
1478        for col_idx in &visible_cols {
1479            let col = &source.columns[*col_idx];
1480            let new_col = DataColumn {
1481                name: col.name.clone(),
1482                data_type: col.data_type.clone(),
1483                nullable: col.nullable,
1484                unique_values: col.unique_values,
1485                null_count: col.null_count,
1486                metadata: col.metadata.clone(),
1487                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1488                source_table: col.source_table.clone(),     // Preserve source table
1489            };
1490            result_table.add_column(new_col);
1491        }
1492
1493        // Copy visible rows
1494        for row_idx in view.visible_row_indices() {
1495            let source_row = &source.rows[*row_idx];
1496            let mut new_row = DataRow { values: Vec::new() };
1497
1498            for col_idx in &visible_cols {
1499                new_row.values.push(source_row.values[*col_idx].clone());
1500            }
1501
1502            result_table.add_row(new_row);
1503        }
1504
1505        Ok(result_table)
1506    }
1507
1508    fn build_view_internal(
1509        &self,
1510        table: Arc<DataTable>,
1511        statement: SelectStatement,
1512    ) -> Result<DataView> {
1513        let mut dummy_plan = ExecutionPlanBuilder::new();
1514        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1515    }
1516
1517    fn build_view_internal_with_plan(
1518        &self,
1519        table: Arc<DataTable>,
1520        statement: SelectStatement,
1521        plan: &mut ExecutionPlanBuilder,
1522    ) -> Result<DataView> {
1523        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1524    }
1525
1526    fn build_view_internal_with_plan_and_exec(
1527        &self,
1528        table: Arc<DataTable>,
1529        statement: SelectStatement,
1530        plan: &mut ExecutionPlanBuilder,
1531        exec_context: Option<&ExecutionContext>,
1532    ) -> Result<DataView> {
1533        debug!(
1534            "QueryEngine::build_view - select_items: {:?}",
1535            statement.select_items
1536        );
1537        debug!(
1538            "QueryEngine::build_view - where_clause: {:?}",
1539            statement.where_clause
1540        );
1541
1542        // Start with all rows visible
1543        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1544
1545        // Apply WHERE clause filtering using recursive evaluator
1546        if let Some(where_clause) = &statement.where_clause {
1547            let total_rows = table.row_count();
1548            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1549            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1550
1551            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1552            plan.set_rows_in(total_rows);
1553            plan.add_detail(format!("Input: {} rows", total_rows));
1554
1555            // Add details about WHERE conditions
1556            for condition in &where_clause.conditions {
1557                plan.add_detail(format!("Condition: {:?}", condition.expr));
1558            }
1559
1560            let filter_start = Instant::now();
1561            // Create an evaluation context for caching compiled regexes
1562            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1563
1564            // Create evaluator ONCE before the loop for performance
1565            let mut evaluator = if let Some(exec_ctx) = exec_context {
1566                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1567                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1568            } else {
1569                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1570            };
1571
1572            // Filter visible rows based on WHERE clause
1573            let mut filtered_rows = Vec::new();
1574            for row_idx in visible_rows {
1575                // Only log for first few rows to avoid performance impact
1576                if row_idx < 3 {
1577                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1578                }
1579
1580                match evaluator.evaluate(where_clause, row_idx) {
1581                    Ok(result) => {
1582                        if row_idx < 3 {
1583                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1584                        }
1585                        if result {
1586                            filtered_rows.push(row_idx);
1587                        }
1588                    }
1589                    Err(e) => {
1590                        if row_idx < 3 {
1591                            debug!(
1592                                "QueryEngine: WHERE evaluation error for row {}: {}",
1593                                row_idx, e
1594                            );
1595                        }
1596                        // Propagate WHERE clause errors instead of silently ignoring them
1597                        return Err(e);
1598                    }
1599                }
1600            }
1601
1602            // Log regex cache statistics
1603            let (compilations, cache_hits) = eval_context.get_stats();
1604            if compilations > 0 || cache_hits > 0 {
1605                debug!(
1606                    "LIKE pattern cache: {} compilations, {} cache hits",
1607                    compilations, cache_hits
1608                );
1609            }
1610            visible_rows = filtered_rows;
1611            let filter_duration = filter_start.elapsed();
1612            info!(
1613                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1614                total_rows,
1615                visible_rows.len(),
1616                filter_duration
1617            );
1618
1619            plan.set_rows_out(visible_rows.len());
1620            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1621            plan.add_detail(format!(
1622                "Filter time: {:.3}ms",
1623                filter_duration.as_secs_f64() * 1000.0
1624            ));
1625            plan.end_step();
1626        }
1627
1628        // Create initial DataView with filtered rows
1629        let mut view = DataView::new(table.clone());
1630        view = view.with_rows(visible_rows);
1631
1632        // Handle GROUP BY if present
1633        if let Some(group_by_exprs) = &statement.group_by {
1634            if !group_by_exprs.is_empty() {
1635                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1636
1637                plan.begin_step(
1638                    StepType::GroupBy,
1639                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1640                );
1641                plan.set_rows_in(view.row_count());
1642                plan.add_detail(format!("Input: {} rows", view.row_count()));
1643                for expr in group_by_exprs {
1644                    plan.add_detail(format!("Group by: {:?}", expr));
1645                }
1646
1647                let group_start = Instant::now();
1648                view = self.apply_group_by(
1649                    view,
1650                    group_by_exprs,
1651                    &statement.select_items,
1652                    statement.having.as_ref(),
1653                    plan,
1654                )?;
1655
1656                plan.set_rows_out(view.row_count());
1657                plan.add_detail(format!("Output: {} groups", view.row_count()));
1658                plan.add_detail(format!(
1659                    "Overall time: {:.3}ms",
1660                    group_start.elapsed().as_secs_f64() * 1000.0
1661                ));
1662                plan.end_step();
1663            }
1664        } else {
1665            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1666            if !statement.select_items.is_empty() {
1667                // Check if we have ANY non-star items (not just the first one)
1668                let has_non_star_items = statement
1669                    .select_items
1670                    .iter()
1671                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1672
1673                // Apply select items if:
1674                // 1. We have computed expressions or explicit columns
1675                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1676                if has_non_star_items || statement.select_items.len() > 1 {
1677                    view = self.apply_select_items(
1678                        view,
1679                        &statement.select_items,
1680                        &statement,
1681                        exec_context,
1682                        plan,
1683                    )?;
1684                }
1685                // If it's just a single star, no projection needed
1686            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1687                debug!("QueryEngine: Using legacy columns path");
1688                // Fallback to legacy column projection for backward compatibility
1689                // Use the current view's source table, not the original table
1690                let source_table = view.source();
1691                let column_indices =
1692                    self.resolve_column_indices(source_table, &statement.columns)?;
1693                view = view.with_columns(column_indices);
1694            }
1695        }
1696
1697        // Apply DISTINCT if specified
1698        if statement.distinct {
1699            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1700            plan.set_rows_in(view.row_count());
1701            plan.add_detail(format!("Input: {} rows", view.row_count()));
1702
1703            let distinct_start = Instant::now();
1704            view = self.apply_distinct(view)?;
1705
1706            plan.set_rows_out(view.row_count());
1707            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1708            plan.add_detail(format!(
1709                "Distinct time: {:.3}ms",
1710                distinct_start.elapsed().as_secs_f64() * 1000.0
1711            ));
1712            plan.end_step();
1713        }
1714
1715        // Apply ORDER BY sorting
1716        if let Some(order_by_columns) = &statement.order_by {
1717            if !order_by_columns.is_empty() {
1718                plan.begin_step(
1719                    StepType::Sort,
1720                    format!("ORDER BY {} columns", order_by_columns.len()),
1721                );
1722                plan.set_rows_in(view.row_count());
1723                for col in order_by_columns {
1724                    // Format the expression (simplified for now - just show column name or "expr")
1725                    let expr_str = match &col.expr {
1726                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1727                        _ => "expr".to_string(),
1728                    };
1729                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1730                }
1731
1732                let sort_start = Instant::now();
1733                view =
1734                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1735
1736                plan.add_detail(format!(
1737                    "Sort time: {:.3}ms",
1738                    sort_start.elapsed().as_secs_f64() * 1000.0
1739                ));
1740                plan.end_step();
1741            }
1742        }
1743
1744        // Apply LIMIT/OFFSET
1745        if let Some(limit) = statement.limit {
1746            let offset = statement.offset.unwrap_or(0);
1747            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1748            plan.set_rows_in(view.row_count());
1749            if offset > 0 {
1750                plan.add_detail(format!("OFFSET: {}", offset));
1751            }
1752            view = view.with_limit(limit, offset);
1753            plan.set_rows_out(view.row_count());
1754            plan.add_detail(format!("Output: {} rows", view.row_count()));
1755            plan.end_step();
1756        }
1757
1758        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1759        if !statement.set_operations.is_empty() {
1760            plan.begin_step(
1761                StepType::SetOperation,
1762                format!("Process {} set operations", statement.set_operations.len()),
1763            );
1764            plan.set_rows_in(view.row_count());
1765
1766            // Materialize the first result set
1767            let mut combined_table = self.materialize_view(view)?;
1768            let first_columns = combined_table.column_names();
1769            let first_column_count = first_columns.len();
1770
1771            // Track if any operation requires deduplication
1772            let mut needs_deduplication = false;
1773
1774            // Process each set operation
1775            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1776                let op_start = Instant::now();
1777                plan.begin_step(
1778                    StepType::SetOperation,
1779                    format!("{:?} operation #{}", operation, idx + 1),
1780                );
1781
1782                // Execute the next SELECT statement
1783                // We need to pass the original table and exec_context for proper resolution
1784                let next_view = if let Some(exec_ctx) = exec_context {
1785                    self.build_view_internal_with_plan_and_exec(
1786                        table.clone(),
1787                        *next_statement.clone(),
1788                        plan,
1789                        Some(exec_ctx),
1790                    )?
1791                } else {
1792                    self.build_view_internal_with_plan(
1793                        table.clone(),
1794                        *next_statement.clone(),
1795                        plan,
1796                    )?
1797                };
1798
1799                // Materialize the next result set
1800                let next_table = self.materialize_view(next_view)?;
1801                let next_columns = next_table.column_names();
1802                let next_column_count = next_columns.len();
1803
1804                // Validate schema compatibility
1805                if first_column_count != next_column_count {
1806                    return Err(anyhow!(
1807                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1808                        first_column_count,
1809                        idx + 2,
1810                        next_column_count
1811                    ));
1812                }
1813
1814                // Warn if column names don't match (but allow it - some SQL dialects do)
1815                for (col_idx, (first_col, next_col)) in
1816                    first_columns.iter().zip(next_columns.iter()).enumerate()
1817                {
1818                    if !first_col.eq_ignore_ascii_case(next_col) {
1819                        debug!(
1820                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1821                            col_idx + 1,
1822                            first_col,
1823                            next_col
1824                        );
1825                    }
1826                }
1827
1828                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1829                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1830
1831                // Perform the set operation
1832                match operation {
1833                    SetOperation::UnionAll => {
1834                        // UNION ALL: Simply concatenate all rows without deduplication
1835                        for row in next_table.rows.iter() {
1836                            combined_table.add_row(row.clone());
1837                        }
1838                        plan.add_detail(format!(
1839                            "Result: {} rows (no deduplication)",
1840                            combined_table.row_count()
1841                        ));
1842                    }
1843                    SetOperation::Union => {
1844                        // UNION: Concatenate all rows first, deduplicate at the end
1845                        for row in next_table.rows.iter() {
1846                            combined_table.add_row(row.clone());
1847                        }
1848                        needs_deduplication = true;
1849                        plan.add_detail(format!(
1850                            "Combined: {} rows (deduplication pending)",
1851                            combined_table.row_count()
1852                        ));
1853                    }
1854                    SetOperation::Intersect => {
1855                        // INTERSECT: Keep only rows that appear in both
1856                        // TODO: Implement intersection logic
1857                        return Err(anyhow!("INTERSECT is not yet implemented"));
1858                    }
1859                    SetOperation::Except => {
1860                        // EXCEPT: Keep only rows from left that don't appear in right
1861                        // TODO: Implement except logic
1862                        return Err(anyhow!("EXCEPT is not yet implemented"));
1863                    }
1864                }
1865
1866                plan.add_detail(format!(
1867                    "Operation time: {:.3}ms",
1868                    op_start.elapsed().as_secs_f64() * 1000.0
1869                ));
1870                plan.set_rows_out(combined_table.row_count());
1871                plan.end_step();
1872            }
1873
1874            plan.set_rows_out(combined_table.row_count());
1875            plan.add_detail(format!(
1876                "Combined result: {} rows after {} operations",
1877                combined_table.row_count(),
1878                statement.set_operations.len()
1879            ));
1880            plan.end_step();
1881
1882            // Create a new view from the combined table
1883            view = DataView::new(Arc::new(combined_table));
1884
1885            // Apply deduplication if any UNION (not UNION ALL) operation was used
1886            if needs_deduplication {
1887                plan.begin_step(
1888                    StepType::Distinct,
1889                    "UNION deduplication - remove duplicate rows".to_string(),
1890                );
1891                plan.set_rows_in(view.row_count());
1892                plan.add_detail(format!("Input: {} rows", view.row_count()));
1893
1894                let distinct_start = Instant::now();
1895                view = self.apply_distinct(view)?;
1896
1897                plan.set_rows_out(view.row_count());
1898                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1899                plan.add_detail(format!(
1900                    "Deduplication time: {:.3}ms",
1901                    distinct_start.elapsed().as_secs_f64() * 1000.0
1902                ));
1903                plan.end_step();
1904            }
1905        }
1906
1907        Ok(view)
1908    }
1909
1910    /// Resolve column names to indices
1911    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1912        let mut indices = Vec::new();
1913        let table_columns = table.column_names();
1914
1915        for col_name in columns {
1916            let index = table_columns
1917                .iter()
1918                .position(|c| c.eq_ignore_ascii_case(col_name))
1919                .ok_or_else(|| {
1920                    let suggestion = self.find_similar_column(table, col_name);
1921                    match suggestion {
1922                        Some(similar) => anyhow::anyhow!(
1923                            "Column '{}' not found. Did you mean '{}'?",
1924                            col_name,
1925                            similar
1926                        ),
1927                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1928                    }
1929                })?;
1930            indices.push(index);
1931        }
1932
1933        Ok(indices)
1934    }
1935
1936    /// Apply SELECT items (columns and computed expressions) to create new view
1937    fn apply_select_items(
1938        &self,
1939        view: DataView,
1940        select_items: &[SelectItem],
1941        _statement: &SelectStatement,
1942        exec_context: Option<&ExecutionContext>,
1943        plan: &mut ExecutionPlanBuilder,
1944    ) -> Result<DataView> {
1945        debug!(
1946            "QueryEngine::apply_select_items - items: {:?}",
1947            select_items
1948        );
1949        debug!(
1950            "QueryEngine::apply_select_items - input view has {} rows",
1951            view.row_count()
1952        );
1953
1954        // Check if any select items contain window functions
1955        let has_window_functions = select_items.iter().any(|item| match item {
1956            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1957            _ => false,
1958        });
1959
1960        // Count window functions for detailed reporting
1961        let window_func_count: usize = select_items
1962            .iter()
1963            .filter(|item| match item {
1964                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1965                _ => false,
1966            })
1967            .count();
1968
1969        // Start timing for window function evaluation if present
1970        let window_start = if has_window_functions {
1971            debug!(
1972                "QueryEngine::apply_select_items - detected {} window functions",
1973                window_func_count
1974            );
1975
1976            // Extract window specs (Step 2: parallel path, not used yet)
1977            let window_specs = Self::extract_window_specs(select_items);
1978            debug!("Extracted {} window function specs", window_specs.len());
1979
1980            Some(Instant::now())
1981        } else {
1982            None
1983        };
1984
1985        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
1986        let has_unnest = select_items.iter().any(|item| match item {
1987            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1988            _ => false,
1989        });
1990
1991        if has_unnest {
1992            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1993            return self.apply_select_with_row_expansion(view, select_items);
1994        }
1995
1996        // Check if this is an aggregate query:
1997        // 1. At least one aggregate function exists
1998        // 2. All other items are either aggregates or constants (aggregate-compatible)
1999        let has_aggregates = select_items.iter().any(|item| match item {
2000            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2001            SelectItem::Column { .. } => false,
2002            SelectItem::Star { .. } => false,
2003            SelectItem::StarExclude { .. } => false,
2004        });
2005
2006        let all_aggregate_compatible = select_items.iter().all(|item| match item {
2007            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2008            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
2009            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
2010            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
2011        });
2012
2013        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2014            // Special handling for aggregate queries with constants (no GROUP BY)
2015            // These should produce exactly one row
2016            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2017            return self.apply_aggregate_select(view, select_items);
2018        }
2019
2020        // Check if we need to create computed columns
2021        let has_computed_expressions = select_items
2022            .iter()
2023            .any(|item| matches!(item, SelectItem::Expression { .. }));
2024
2025        debug!(
2026            "QueryEngine::apply_select_items - has_computed_expressions: {}",
2027            has_computed_expressions
2028        );
2029
2030        if !has_computed_expressions {
2031            // Simple case: only columns, use existing projection logic
2032            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2033            return Ok(view.with_columns(column_indices));
2034        }
2035
2036        // Complex case: we have computed expressions
2037        // IMPORTANT: We create a PROJECTED view, not a new table
2038        // This preserves the original DataTable reference
2039
2040        let source_table = view.source();
2041        let visible_rows = view.visible_row_indices();
2042
2043        // Create a temporary table just for the computed result view
2044        // But this table is only used for the current query result
2045        let mut computed_table = DataTable::new("query_result");
2046
2047        // First, expand any Star selectors to actual columns
2048        let mut expanded_items = Vec::new();
2049        for item in select_items {
2050            match item {
2051                SelectItem::Star { table_prefix, .. } => {
2052                    if let Some(prefix) = table_prefix {
2053                        // Scoped expansion: table.* expands only columns from that table
2054                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2055                        for col in &source_table.columns {
2056                            if Self::column_matches_table(col, prefix) {
2057                                expanded_items.push(SelectItem::Column {
2058                                    column: ColumnRef::unquoted(col.name.clone()),
2059                                    leading_comments: vec![],
2060                                    trailing_comment: None,
2061                                });
2062                            }
2063                        }
2064                    } else {
2065                        // Unscoped expansion: * expands to all columns
2066                        debug!("QueryEngine::apply_select_items - expanding *");
2067                        for col_name in source_table.column_names() {
2068                            expanded_items.push(SelectItem::Column {
2069                                column: ColumnRef::unquoted(col_name.to_string()),
2070                                leading_comments: vec![],
2071                                trailing_comment: None,
2072                            });
2073                        }
2074                    }
2075                }
2076                _ => expanded_items.push(item.clone()),
2077            }
2078        }
2079
2080        // Add columns based on expanded SelectItems, handling duplicates
2081        let mut column_name_counts: std::collections::HashMap<String, usize> =
2082            std::collections::HashMap::new();
2083
2084        for item in &expanded_items {
2085            let base_name = match item {
2086                SelectItem::Column {
2087                    column: col_ref, ..
2088                } => col_ref.name.clone(),
2089                SelectItem::Expression { alias, .. } => alias.clone(),
2090                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2091                SelectItem::StarExclude { .. } => {
2092                    unreachable!("StarExclude should have been expanded")
2093                }
2094            };
2095
2096            // Check if this column name has been used before
2097            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2098            let column_name = if *count == 0 {
2099                // First occurrence, use the name as-is
2100                base_name.clone()
2101            } else {
2102                // Duplicate, append a suffix
2103                format!("{base_name}_{count}")
2104            };
2105            *count += 1;
2106
2107            computed_table.add_column(DataColumn::new(&column_name));
2108        }
2109
2110        // Check if batch evaluation can be used
2111        // Batch evaluation is the default but we need to check if all window functions
2112        // are standalone (not embedded in expressions)
2113        let can_use_batch = expanded_items.iter().all(|item| {
2114            match item {
2115                SelectItem::Expression { expr, .. } => {
2116                    // Only use batch evaluation if the expression IS a window function,
2117                    // not if it CONTAINS a window function
2118                    matches!(expr, SqlExpression::WindowFunction { .. })
2119                        || !Self::contains_window_function(expr)
2120                }
2121                _ => true, // Non-expressions are fine
2122            }
2123        });
2124
2125        // Batch evaluation is now the default mode for improved performance
2126        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
2127        let use_batch_evaluation = can_use_batch
2128            && std::env::var("SQL_CLI_BATCH_WINDOW")
2129                .map(|v| v != "0" && v.to_lowercase() != "false")
2130                .unwrap_or(true);
2131
2132        // Store window specs for batch evaluation if needed
2133        let batch_window_specs = if use_batch_evaluation && has_window_functions {
2134            debug!("BATCH window function evaluation flag is enabled");
2135            // Extract window specs before timing starts
2136            let specs = Self::extract_window_specs(&expanded_items);
2137            debug!(
2138                "Extracted {} window function specs for batch evaluation",
2139                specs.len()
2140            );
2141            Some(specs)
2142        } else {
2143            None
2144        };
2145
2146        // Calculate values for each row
2147        let mut evaluator =
2148            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2149
2150        // Populate table aliases from exec_context if available
2151        if let Some(exec_ctx) = exec_context {
2152            let aliases = exec_ctx.get_aliases();
2153            if !aliases.is_empty() {
2154                debug!(
2155                    "Applying {} aliases to evaluator: {:?}",
2156                    aliases.len(),
2157                    aliases
2158                );
2159                evaluator = evaluator.with_table_aliases(aliases);
2160            }
2161        }
2162
2163        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2164        // This avoids 50,000+ redundant context lookups
2165        if has_window_functions {
2166            let preload_start = Instant::now();
2167
2168            // Extract all unique WindowSpecs from SELECT items
2169            let mut window_specs = Vec::new();
2170            for item in &expanded_items {
2171                if let SelectItem::Expression { expr, .. } = item {
2172                    Self::collect_window_specs(expr, &mut window_specs);
2173                }
2174            }
2175
2176            // Pre-create all WindowContexts
2177            for spec in &window_specs {
2178                let _ = evaluator.get_or_create_window_context(spec);
2179            }
2180
2181            debug!(
2182                "Pre-created {} WindowContext(s) in {:.2}ms",
2183                window_specs.len(),
2184                preload_start.elapsed().as_secs_f64() * 1000.0
2185            );
2186        }
2187
2188        // Batch evaluation path for window functions
2189        if let Some(window_specs) = batch_window_specs {
2190            debug!("Starting batch window function evaluation");
2191            let batch_start = Instant::now();
2192
2193            // Initialize result table with all rows
2194            let mut batch_results: Vec<Vec<DataValue>> =
2195                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2196
2197            // Use the window specs we extracted earlier
2198            let detailed_window_specs = &window_specs;
2199
2200            // Group window specs by their WindowSpec for batch processing
2201            let mut specs_by_window: HashMap<
2202                u64,
2203                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2204            > = HashMap::new();
2205            for spec in detailed_window_specs {
2206                let hash = spec.spec.compute_hash();
2207                specs_by_window
2208                    .entry(hash)
2209                    .or_insert_with(Vec::new)
2210                    .push(spec);
2211            }
2212
2213            // Process each unique window specification
2214            for (_window_hash, specs) in specs_by_window {
2215                // Get the window context (already pre-created)
2216                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2217
2218                // Process each function using this window
2219                for spec in specs {
2220                    match spec.function_name.as_str() {
2221                        "LAG" => {
2222                            // Extract column and offset from arguments
2223                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2224                                let column_name = col_ref.name.as_str();
2225                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2226                                    spec.args.get(1)
2227                                {
2228                                    n.parse::<i64>().unwrap_or(1)
2229                                } else {
2230                                    1 // default offset
2231                                };
2232
2233                                let values = context.evaluate_lag_batch(
2234                                    visible_rows,
2235                                    column_name,
2236                                    offset,
2237                                )?;
2238
2239                                // Write results to the output column
2240                                for (row_idx, value) in values.into_iter().enumerate() {
2241                                    batch_results[row_idx][spec.output_column_index] = value;
2242                                }
2243                            }
2244                        }
2245                        "LEAD" => {
2246                            // Extract column and offset from arguments
2247                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2248                                let column_name = col_ref.name.as_str();
2249                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2250                                    spec.args.get(1)
2251                                {
2252                                    n.parse::<i64>().unwrap_or(1)
2253                                } else {
2254                                    1 // default offset
2255                                };
2256
2257                                let values = context.evaluate_lead_batch(
2258                                    visible_rows,
2259                                    column_name,
2260                                    offset,
2261                                )?;
2262
2263                                // Write results to the output column
2264                                for (row_idx, value) in values.into_iter().enumerate() {
2265                                    batch_results[row_idx][spec.output_column_index] = value;
2266                                }
2267                            }
2268                        }
2269                        "ROW_NUMBER" => {
2270                            let values = context.evaluate_row_number_batch(visible_rows)?;
2271
2272                            // Write results to the output column
2273                            for (row_idx, value) in values.into_iter().enumerate() {
2274                                batch_results[row_idx][spec.output_column_index] = value;
2275                            }
2276                        }
2277                        "RANK" => {
2278                            let values = context.evaluate_rank_batch(visible_rows)?;
2279
2280                            // Write results to the output column
2281                            for (row_idx, value) in values.into_iter().enumerate() {
2282                                batch_results[row_idx][spec.output_column_index] = value;
2283                            }
2284                        }
2285                        "DENSE_RANK" => {
2286                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2287
2288                            // Write results to the output column
2289                            for (row_idx, value) in values.into_iter().enumerate() {
2290                                batch_results[row_idx][spec.output_column_index] = value;
2291                            }
2292                        }
2293                        "SUM" => {
2294                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2295                                let column_name = col_ref.name.as_str();
2296                                let values =
2297                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2298
2299                                for (row_idx, value) in values.into_iter().enumerate() {
2300                                    batch_results[row_idx][spec.output_column_index] = value;
2301                                }
2302                            }
2303                        }
2304                        "AVG" => {
2305                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2306                                let column_name = col_ref.name.as_str();
2307                                let values =
2308                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2309
2310                                for (row_idx, value) in values.into_iter().enumerate() {
2311                                    batch_results[row_idx][spec.output_column_index] = value;
2312                                }
2313                            }
2314                        }
2315                        "MIN" => {
2316                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2317                                let column_name = col_ref.name.as_str();
2318                                let values =
2319                                    context.evaluate_min_batch(visible_rows, column_name)?;
2320
2321                                for (row_idx, value) in values.into_iter().enumerate() {
2322                                    batch_results[row_idx][spec.output_column_index] = value;
2323                                }
2324                            }
2325                        }
2326                        "MAX" => {
2327                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2328                                let column_name = col_ref.name.as_str();
2329                                let values =
2330                                    context.evaluate_max_batch(visible_rows, column_name)?;
2331
2332                                for (row_idx, value) in values.into_iter().enumerate() {
2333                                    batch_results[row_idx][spec.output_column_index] = value;
2334                                }
2335                            }
2336                        }
2337                        "COUNT" => {
2338                            // COUNT can be COUNT(*) or COUNT(column)
2339                            let column_name = match spec.args.get(0) {
2340                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2341                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2342                                _ => None,
2343                            };
2344
2345                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2346
2347                            for (row_idx, value) in values.into_iter().enumerate() {
2348                                batch_results[row_idx][spec.output_column_index] = value;
2349                            }
2350                        }
2351                        "FIRST_VALUE" => {
2352                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2353                                let column_name = col_ref.name.as_str();
2354                                let values = context
2355                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2356
2357                                for (row_idx, value) in values.into_iter().enumerate() {
2358                                    batch_results[row_idx][spec.output_column_index] = value;
2359                                }
2360                            }
2361                        }
2362                        "LAST_VALUE" => {
2363                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2364                                let column_name = col_ref.name.as_str();
2365                                let values =
2366                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2367
2368                                for (row_idx, value) in values.into_iter().enumerate() {
2369                                    batch_results[row_idx][spec.output_column_index] = value;
2370                                }
2371                            }
2372                        }
2373                        _ => {
2374                            // Fall back to per-row evaluation for unsupported functions
2375                            debug!(
2376                                "Window function {} not supported in batch mode, using per-row",
2377                                spec.function_name
2378                            );
2379                        }
2380                    }
2381                }
2382            }
2383
2384            // Now evaluate non-window columns
2385            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2386                for (col_idx, item) in expanded_items.iter().enumerate() {
2387                    // Skip if this column was already filled by a window function
2388                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2389                        continue;
2390                    }
2391
2392                    let value = match item {
2393                        SelectItem::Column {
2394                            column: col_ref, ..
2395                        } => {
2396                            match evaluator
2397                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2398                            {
2399                                Ok(val) => val,
2400                                Err(e) => {
2401                                    return Err(anyhow!(
2402                                        "Failed to evaluate column {}: {}",
2403                                        col_ref.to_sql(),
2404                                        e
2405                                    ));
2406                                }
2407                            }
2408                        }
2409                        SelectItem::Expression { expr, .. } => {
2410                            // For batch evaluation, we need to handle expressions differently
2411                            // If this is just a window function by itself, we already computed it
2412                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2413                                // Pure window function - already handled in batch evaluation
2414                                continue;
2415                            }
2416                            // For expressions containing window functions or regular expressions,
2417                            // evaluate them normally
2418                            evaluator.evaluate(&expr, source_row_idx)?
2419                        }
2420                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2421                        SelectItem::StarExclude { .. } => {
2422                            unreachable!("StarExclude should have been expanded")
2423                        }
2424                    };
2425                    batch_results[result_row_idx][col_idx] = value;
2426                }
2427            }
2428
2429            // Add all rows to the table
2430            for row_values in batch_results {
2431                computed_table
2432                    .add_row(DataRow::new(row_values))
2433                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2434            }
2435
2436            debug!(
2437                "Batch window evaluation completed in {:.3}ms",
2438                batch_start.elapsed().as_secs_f64() * 1000.0
2439            );
2440        } else {
2441            // Original per-row evaluation path
2442            for &row_idx in visible_rows {
2443                let mut row_values = Vec::new();
2444
2445                for item in &expanded_items {
2446                    let value = match item {
2447                        SelectItem::Column {
2448                            column: col_ref, ..
2449                        } => {
2450                            // Use evaluator for column resolution (handles aliases properly)
2451                            match evaluator
2452                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2453                            {
2454                                Ok(val) => val,
2455                                Err(e) => {
2456                                    return Err(anyhow!(
2457                                        "Failed to evaluate column {}: {}",
2458                                        col_ref.to_sql(),
2459                                        e
2460                                    ));
2461                                }
2462                            }
2463                        }
2464                        SelectItem::Expression { expr, .. } => {
2465                            // Computed expression
2466                            evaluator.evaluate(&expr, row_idx)?
2467                        }
2468                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2469                        SelectItem::StarExclude { .. } => {
2470                            unreachable!("StarExclude should have been expanded")
2471                        }
2472                    };
2473                    row_values.push(value);
2474                }
2475
2476                computed_table
2477                    .add_row(DataRow::new(row_values))
2478                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2479            }
2480        }
2481
2482        // Log window function timing if applicable
2483        if let Some(start) = window_start {
2484            let window_duration = start.elapsed();
2485            info!(
2486                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2487                window_duration.as_secs_f64() * 1000.0,
2488                visible_rows.len(),
2489                window_func_count
2490            );
2491
2492            // Add to execution plan
2493            plan.begin_step(
2494                StepType::WindowFunction,
2495                format!("Evaluate {} window function(s)", window_func_count),
2496            );
2497            plan.set_rows_in(visible_rows.len());
2498            plan.set_rows_out(visible_rows.len());
2499            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2500            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2501            plan.add_detail(format!(
2502                "Evaluation time: {:.3}ms",
2503                window_duration.as_secs_f64() * 1000.0
2504            ));
2505            plan.end_step();
2506        }
2507
2508        // Return a view of the computed result
2509        // This is a temporary view for this query only
2510        Ok(DataView::new(Arc::new(computed_table)))
2511    }
2512
2513    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2514    fn apply_select_with_row_expansion(
2515        &self,
2516        view: DataView,
2517        select_items: &[SelectItem],
2518    ) -> Result<DataView> {
2519        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2520
2521        let source_table = view.source();
2522        let visible_rows = view.visible_row_indices();
2523        let expander_registry = RowExpanderRegistry::new();
2524
2525        // Create result table
2526        let mut result_table = DataTable::new("unnest_result");
2527
2528        // Expand * to columns and set up result columns
2529        let mut expanded_items = Vec::new();
2530        for item in select_items {
2531            match item {
2532                SelectItem::Star { table_prefix, .. } => {
2533                    if let Some(prefix) = table_prefix {
2534                        // Scoped expansion: table.* expands only columns from that table
2535                        debug!(
2536                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2537                            prefix
2538                        );
2539                        for col in &source_table.columns {
2540                            if Self::column_matches_table(col, prefix) {
2541                                expanded_items.push(SelectItem::Column {
2542                                    column: ColumnRef::unquoted(col.name.clone()),
2543                                    leading_comments: vec![],
2544                                    trailing_comment: None,
2545                                });
2546                            }
2547                        }
2548                    } else {
2549                        // Unscoped expansion: * expands to all columns
2550                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2551                        for col_name in source_table.column_names() {
2552                            expanded_items.push(SelectItem::Column {
2553                                column: ColumnRef::unquoted(col_name.to_string()),
2554                                leading_comments: vec![],
2555                                trailing_comment: None,
2556                            });
2557                        }
2558                    }
2559                }
2560                _ => expanded_items.push(item.clone()),
2561            }
2562        }
2563
2564        // Add columns to result table
2565        for item in &expanded_items {
2566            let column_name = match item {
2567                SelectItem::Column {
2568                    column: col_ref, ..
2569                } => col_ref.name.clone(),
2570                SelectItem::Expression { alias, .. } => alias.clone(),
2571                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2572                SelectItem::StarExclude { .. } => {
2573                    unreachable!("StarExclude should have been expanded")
2574                }
2575            };
2576            result_table.add_column(DataColumn::new(&column_name));
2577        }
2578
2579        // Process each input row
2580        let mut evaluator =
2581            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2582
2583        for &row_idx in visible_rows {
2584            // First pass: identify UNNEST expressions and collect their expansion arrays
2585            let mut unnest_expansions = Vec::new();
2586            let mut unnest_indices = Vec::new();
2587
2588            for (col_idx, item) in expanded_items.iter().enumerate() {
2589                if let SelectItem::Expression { expr, .. } = item {
2590                    if let Some(expansion_result) = self.try_expand_unnest(
2591                        &expr,
2592                        source_table,
2593                        row_idx,
2594                        &mut evaluator,
2595                        &expander_registry,
2596                    )? {
2597                        unnest_expansions.push(expansion_result);
2598                        unnest_indices.push(col_idx);
2599                    }
2600                }
2601            }
2602
2603            // Determine how many output rows to generate
2604            let expansion_count = if unnest_expansions.is_empty() {
2605                1 // No UNNEST, just one row
2606            } else {
2607                unnest_expansions
2608                    .iter()
2609                    .map(|exp| exp.row_count())
2610                    .max()
2611                    .unwrap_or(1)
2612            };
2613
2614            // Generate output rows
2615            for output_idx in 0..expansion_count {
2616                let mut row_values = Vec::new();
2617
2618                for (col_idx, item) in expanded_items.iter().enumerate() {
2619                    // Check if this column is an UNNEST column
2620                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2621
2622                    let value = if let Some(unnest_idx) = unnest_position {
2623                        // Get value from expansion array (or NULL if exhausted)
2624                        let expansion = &unnest_expansions[unnest_idx];
2625                        expansion
2626                            .values
2627                            .get(output_idx)
2628                            .cloned()
2629                            .unwrap_or(DataValue::Null)
2630                    } else {
2631                        // Regular column or non-UNNEST expression - replicate from input
2632                        match item {
2633                            SelectItem::Column {
2634                                column: col_ref, ..
2635                            } => {
2636                                let col_idx =
2637                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2638                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2639                                    )?;
2640                                let row = source_table
2641                                    .get_row(row_idx)
2642                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2643                                row.get(col_idx)
2644                                    .ok_or_else(|| {
2645                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2646                                    })?
2647                                    .clone()
2648                            }
2649                            SelectItem::Expression { expr, .. } => {
2650                                // Non-UNNEST expression - evaluate once and replicate
2651                                evaluator.evaluate(&expr, row_idx)?
2652                            }
2653                            SelectItem::Star { .. } => unreachable!(),
2654                            SelectItem::StarExclude { .. } => {
2655                                unreachable!("StarExclude should have been expanded")
2656                            }
2657                        }
2658                    };
2659
2660                    row_values.push(value);
2661                }
2662
2663                result_table
2664                    .add_row(DataRow::new(row_values))
2665                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2666            }
2667        }
2668
2669        debug!(
2670            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2671            visible_rows.len(),
2672            result_table.row_count()
2673        );
2674
2675        Ok(DataView::new(Arc::new(result_table)))
2676    }
2677
2678    /// Try to expand an expression if it's an UNNEST call
2679    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2680    fn try_expand_unnest(
2681        &self,
2682        expr: &SqlExpression,
2683        _source_table: &DataTable,
2684        row_idx: usize,
2685        evaluator: &mut ArithmeticEvaluator,
2686        expander_registry: &RowExpanderRegistry,
2687    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2688        // Check for UNNEST variant (direct syntax)
2689        if let SqlExpression::Unnest { column, delimiter } = expr {
2690            // Evaluate the column expression
2691            let column_value = evaluator.evaluate(column, row_idx)?;
2692
2693            // Delimiter is already a string literal
2694            let delimiter_value = DataValue::String(delimiter.clone());
2695
2696            // Get the UNNEST expander
2697            let expander = expander_registry
2698                .get("UNNEST")
2699                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2700
2701            // Expand the value
2702            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2703            return Ok(Some(expansion));
2704        }
2705
2706        // Also check for FunctionCall form (for compatibility)
2707        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2708            if name.to_uppercase() == "UNNEST" {
2709                // UNNEST(column, delimiter)
2710                if args.len() != 2 {
2711                    return Err(anyhow::anyhow!(
2712                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2713                    ));
2714                }
2715
2716                // Evaluate the column expression (first arg)
2717                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2718
2719                // Evaluate the delimiter expression (second arg)
2720                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2721
2722                // Get the UNNEST expander
2723                let expander = expander_registry
2724                    .get("UNNEST")
2725                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2726
2727                // Expand the value
2728                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2729                return Ok(Some(expansion));
2730            }
2731        }
2732
2733        Ok(None)
2734    }
2735
2736    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2737    fn apply_aggregate_select(
2738        &self,
2739        view: DataView,
2740        select_items: &[SelectItem],
2741    ) -> Result<DataView> {
2742        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2743
2744        let source_table = view.source();
2745        let mut result_table = DataTable::new("aggregate_result");
2746
2747        // Add columns for each select item
2748        for item in select_items {
2749            let column_name = match item {
2750                SelectItem::Expression { alias, .. } => alias.clone(),
2751                _ => unreachable!("Should only have expressions in aggregate-only query"),
2752            };
2753            result_table.add_column(DataColumn::new(&column_name));
2754        }
2755
2756        // Create evaluator with visible rows from the view (for filtered aggregates)
2757        let visible_rows = view.visible_row_indices().to_vec();
2758        let mut evaluator =
2759            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2760                .with_visible_rows(visible_rows);
2761
2762        // Evaluate each aggregate expression once (they handle all rows internally)
2763        let mut row_values = Vec::new();
2764        for item in select_items {
2765            match item {
2766                SelectItem::Expression { expr, .. } => {
2767                    // The evaluator will handle aggregates over all rows
2768                    // We pass row_index=0 but aggregates ignore it and process all rows
2769                    let value = evaluator.evaluate(expr, 0)?;
2770                    row_values.push(value);
2771                }
2772                _ => unreachable!("Should only have expressions in aggregate-only query"),
2773            }
2774        }
2775
2776        // Add the single result row
2777        result_table
2778            .add_row(DataRow::new(row_values))
2779            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2780
2781        Ok(DataView::new(Arc::new(result_table)))
2782    }
2783
2784    /// Check if a column belongs to a specific table based on source_table or qualified_name
2785    ///
2786    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2787    /// to filter which columns should be included.
2788    ///
2789    /// # Arguments
2790    /// * `col` - The column to check
2791    /// * `table_name` - The table name or alias to match against
2792    ///
2793    /// # Returns
2794    /// `true` if the column belongs to the specified table
2795    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2796        // First, check the source_table field
2797        if let Some(ref source) = col.source_table {
2798            // Direct match or matches with schema qualification
2799            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2800                return true;
2801            }
2802        }
2803
2804        // Second, check the qualified_name field
2805        if let Some(ref qualified) = col.qualified_name {
2806            // Check if qualified name starts with "table_name."
2807            if qualified.starts_with(&format!("{}.", table_name)) {
2808                return true;
2809            }
2810        }
2811
2812        false
2813    }
2814
2815    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2816    fn resolve_select_columns(
2817        &self,
2818        table: &DataTable,
2819        select_items: &[SelectItem],
2820    ) -> Result<Vec<usize>> {
2821        let mut indices = Vec::new();
2822        let table_columns = table.column_names();
2823
2824        for item in select_items {
2825            match item {
2826                SelectItem::Column {
2827                    column: col_ref, ..
2828                } => {
2829                    // Check if this has a table prefix
2830                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2831                        // For qualified references, ONLY try qualified lookup - no fallback
2832                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2833                        table.find_column_by_qualified_name(&qualified_name)
2834                            .ok_or_else(|| {
2835                                // Check if any columns have qualified names for better error message
2836                                let has_qualified = table.columns.iter()
2837                                    .any(|c| c.qualified_name.is_some());
2838                                if !has_qualified {
2839                                    anyhow::anyhow!(
2840                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2841                                        qualified_name, table_prefix
2842                                    )
2843                                } else {
2844                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2845                                }
2846                            })?
2847                    } else {
2848                        // Simple column name lookup
2849                        table_columns
2850                            .iter()
2851                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2852                            .ok_or_else(|| {
2853                                let suggestion = self.find_similar_column(table, &col_ref.name);
2854                                match suggestion {
2855                                    Some(similar) => anyhow::anyhow!(
2856                                        "Column '{}' not found. Did you mean '{}'?",
2857                                        col_ref.name,
2858                                        similar
2859                                    ),
2860                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2861                                }
2862                            })?
2863                    };
2864                    indices.push(index);
2865                }
2866                SelectItem::Star { table_prefix, .. } => {
2867                    if let Some(prefix) = table_prefix {
2868                        // Scoped expansion: table.* expands only columns from that table
2869                        for (i, col) in table.columns.iter().enumerate() {
2870                            if Self::column_matches_table(col, prefix) {
2871                                indices.push(i);
2872                            }
2873                        }
2874                    } else {
2875                        // Unscoped expansion: * expands to all column indices
2876                        for i in 0..table_columns.len() {
2877                            indices.push(i);
2878                        }
2879                    }
2880                }
2881                SelectItem::StarExclude {
2882                    table_prefix,
2883                    excluded_columns,
2884                    ..
2885                } => {
2886                    // Expand all columns (with optional table prefix), then exclude specified ones
2887                    if let Some(prefix) = table_prefix {
2888                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2889                        for (i, col) in table.columns.iter().enumerate() {
2890                            if Self::column_matches_table(col, prefix)
2891                                && !excluded_columns.contains(&col.name)
2892                            {
2893                                indices.push(i);
2894                            }
2895                        }
2896                    } else {
2897                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2898                        for (i, col_name) in table_columns.iter().enumerate() {
2899                            if !excluded_columns
2900                                .iter()
2901                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2902                            {
2903                                indices.push(i);
2904                            }
2905                        }
2906                    }
2907                }
2908                SelectItem::Expression { .. } => {
2909                    return Err(anyhow::anyhow!(
2910                        "Computed expressions require new table creation"
2911                    ));
2912                }
2913            }
2914        }
2915
2916        Ok(indices)
2917    }
2918
2919    /// Apply DISTINCT to remove duplicate rows
2920    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2921        use std::collections::HashSet;
2922
2923        let source = view.source();
2924        let visible_cols = view.visible_column_indices();
2925        let visible_rows = view.visible_row_indices();
2926
2927        // Build a set to track unique rows
2928        let mut seen_rows = HashSet::new();
2929        let mut unique_row_indices = Vec::new();
2930
2931        for &row_idx in visible_rows {
2932            // Build a key representing this row's visible column values
2933            let mut row_key = Vec::new();
2934            for &col_idx in visible_cols {
2935                let value = source
2936                    .get_value(row_idx, col_idx)
2937                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2938                // Convert value to a hashable representation
2939                row_key.push(format!("{:?}", value));
2940            }
2941
2942            // Check if we've seen this row before
2943            if seen_rows.insert(row_key) {
2944                // First time seeing this row combination
2945                unique_row_indices.push(row_idx);
2946            }
2947        }
2948
2949        // Create a new view with only unique rows
2950        Ok(view.with_rows(unique_row_indices))
2951    }
2952
2953    /// Apply multi-column ORDER BY sorting to the view
2954    fn apply_multi_order_by(
2955        &self,
2956        view: DataView,
2957        order_by_columns: &[OrderByItem],
2958    ) -> Result<DataView> {
2959        self.apply_multi_order_by_with_context(view, order_by_columns, None)
2960    }
2961
2962    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
2963    fn apply_multi_order_by_with_context(
2964        &self,
2965        mut view: DataView,
2966        order_by_columns: &[OrderByItem],
2967        _exec_context: Option<&ExecutionContext>,
2968    ) -> Result<DataView> {
2969        // Build list of (source_column_index, ascending) tuples
2970        let mut sort_columns = Vec::new();
2971
2972        for order_col in order_by_columns {
2973            // Extract column name from expression (currently only supports simple columns)
2974            let column_name = match &order_col.expr {
2975                SqlExpression::Column(col_ref) => col_ref.name.clone(),
2976                _ => {
2977                    // TODO: Support expression evaluation in ORDER BY
2978                    return Err(anyhow!(
2979                        "ORDER BY expressions not yet supported - only simple columns allowed"
2980                    ));
2981                }
2982            };
2983
2984            // Try to find the column index, handling qualified column names (table.column)
2985            let col_index = if column_name.contains('.') {
2986                // Qualified column name - extract unqualified part
2987                if let Some(dot_pos) = column_name.rfind('.') {
2988                    let col_name = &column_name[dot_pos + 1..];
2989
2990                    // After SELECT processing, columns are unqualified
2991                    // So just use the column name part
2992                    debug!(
2993                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
2994                        col_name, column_name
2995                    );
2996                    view.source().get_column_index(col_name)
2997                } else {
2998                    view.source().get_column_index(&column_name)
2999                }
3000            } else {
3001                // Simple column name
3002                view.source().get_column_index(&column_name)
3003            }
3004            .ok_or_else(|| {
3005                // If not found, provide helpful error with suggestions
3006                let suggestion = self.find_similar_column(view.source(), &column_name);
3007                match suggestion {
3008                    Some(similar) => anyhow::anyhow!(
3009                        "Column '{}' not found. Did you mean '{}'?",
3010                        column_name,
3011                        similar
3012                    ),
3013                    None => {
3014                        // Also list available columns for debugging
3015                        let available_cols = view.source().column_names().join(", ");
3016                        anyhow::anyhow!(
3017                            "Column '{}' not found. Available columns: {}",
3018                            column_name,
3019                            available_cols
3020                        )
3021                    }
3022                }
3023            })?;
3024
3025            let ascending = matches!(order_col.direction, SortDirection::Asc);
3026            sort_columns.push((col_index, ascending));
3027        }
3028
3029        // Apply multi-column sorting
3030        view.apply_multi_sort(&sort_columns)?;
3031        Ok(view)
3032    }
3033
3034    /// Apply GROUP BY to the view with optional HAVING clause
3035    fn apply_group_by(
3036        &self,
3037        view: DataView,
3038        group_by_exprs: &[SqlExpression],
3039        select_items: &[SelectItem],
3040        having: Option<&SqlExpression>,
3041        plan: &mut ExecutionPlanBuilder,
3042    ) -> Result<DataView> {
3043        // Use the new expression-based GROUP BY implementation
3044        let (result_view, phase_info) = self.apply_group_by_expressions(
3045            view,
3046            group_by_exprs,
3047            select_items,
3048            having,
3049            self.case_insensitive,
3050            self.date_notation.clone(),
3051        )?;
3052
3053        // Add detailed phase information to the execution plan
3054        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3055        plan.add_detail(format!(
3056            "Phase 1 - Group Building: {:.3}ms",
3057            phase_info.phase2_key_building.as_secs_f64() * 1000.0
3058        ));
3059        plan.add_detail(format!(
3060            "  • Processing {} rows into {} groups",
3061            phase_info.total_rows, phase_info.num_groups
3062        ));
3063        plan.add_detail(format!(
3064            "Phase 2 - Aggregation: {:.3}ms",
3065            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3066        ));
3067        if phase_info.phase4_having_evaluation > Duration::ZERO {
3068            plan.add_detail(format!(
3069                "Phase 3 - HAVING Filter: {:.3}ms",
3070                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3071            ));
3072            plan.add_detail(format!(
3073                "  • Filtered {} groups",
3074                phase_info.groups_filtered_by_having
3075            ));
3076        }
3077        plan.add_detail(format!(
3078            "Total GROUP BY time: {:.3}ms",
3079            phase_info.total_time.as_secs_f64() * 1000.0
3080        ));
3081
3082        Ok(result_view)
3083    }
3084
3085    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
3086    /// This helps pre-size hash tables for better performance
3087    pub fn estimate_group_cardinality(
3088        &self,
3089        view: &DataView,
3090        group_by_exprs: &[SqlExpression],
3091    ) -> usize {
3092        // If we have few rows, just return the row count as upper bound
3093        let row_count = view.get_visible_rows().len();
3094        if row_count <= 100 {
3095            return row_count;
3096        }
3097
3098        // Sample first 1000 rows or 10% of data, whichever is smaller
3099        let sample_size = min(1000, row_count / 10).max(100);
3100        let mut seen = FxHashSet::default();
3101
3102        let visible_rows = view.get_visible_rows();
3103        for (i, &row_idx) in visible_rows.iter().enumerate() {
3104            if i >= sample_size {
3105                break;
3106            }
3107
3108            // Evaluate GROUP BY expressions for this row
3109            let mut key_values = Vec::new();
3110            for expr in group_by_exprs {
3111                let mut evaluator = ArithmeticEvaluator::new(view.source());
3112                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3113                key_values.push(value);
3114            }
3115
3116            seen.insert(key_values);
3117        }
3118
3119        // Estimate total cardinality based on sample
3120        let sample_cardinality = seen.len();
3121        let estimated = (sample_cardinality * row_count) / sample_size;
3122
3123        // Cap at row count and ensure minimum of sample cardinality
3124        estimated.min(row_count).max(sample_cardinality)
3125    }
3126}
3127
3128#[cfg(test)]
3129mod tests {
3130    use super::*;
3131    use crate::data::datatable::{DataColumn, DataRow, DataValue};
3132
3133    fn create_test_table() -> Arc<DataTable> {
3134        let mut table = DataTable::new("test");
3135
3136        // Add columns
3137        table.add_column(DataColumn::new("id"));
3138        table.add_column(DataColumn::new("name"));
3139        table.add_column(DataColumn::new("age"));
3140
3141        // Add rows
3142        table
3143            .add_row(DataRow::new(vec![
3144                DataValue::Integer(1),
3145                DataValue::String("Alice".to_string()),
3146                DataValue::Integer(30),
3147            ]))
3148            .unwrap();
3149
3150        table
3151            .add_row(DataRow::new(vec![
3152                DataValue::Integer(2),
3153                DataValue::String("Bob".to_string()),
3154                DataValue::Integer(25),
3155            ]))
3156            .unwrap();
3157
3158        table
3159            .add_row(DataRow::new(vec![
3160                DataValue::Integer(3),
3161                DataValue::String("Charlie".to_string()),
3162                DataValue::Integer(35),
3163            ]))
3164            .unwrap();
3165
3166        Arc::new(table)
3167    }
3168
3169    #[test]
3170    fn test_select_all() {
3171        let table = create_test_table();
3172        let engine = QueryEngine::new();
3173
3174        let view = engine
3175            .execute(table.clone(), "SELECT * FROM users")
3176            .unwrap();
3177        assert_eq!(view.row_count(), 3);
3178        assert_eq!(view.column_count(), 3);
3179    }
3180
3181    #[test]
3182    fn test_select_columns() {
3183        let table = create_test_table();
3184        let engine = QueryEngine::new();
3185
3186        let view = engine
3187            .execute(table.clone(), "SELECT name, age FROM users")
3188            .unwrap();
3189        assert_eq!(view.row_count(), 3);
3190        assert_eq!(view.column_count(), 2);
3191    }
3192
3193    #[test]
3194    fn test_select_with_limit() {
3195        let table = create_test_table();
3196        let engine = QueryEngine::new();
3197
3198        let view = engine
3199            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3200            .unwrap();
3201        assert_eq!(view.row_count(), 2);
3202    }
3203
3204    #[test]
3205    fn test_type_coercion_contains() {
3206        // Initialize tracing for debug output
3207        let _ = tracing_subscriber::fmt()
3208            .with_max_level(tracing::Level::DEBUG)
3209            .try_init();
3210
3211        let mut table = DataTable::new("test");
3212        table.add_column(DataColumn::new("id"));
3213        table.add_column(DataColumn::new("status"));
3214        table.add_column(DataColumn::new("price"));
3215
3216        // Add test data with mixed types
3217        table
3218            .add_row(DataRow::new(vec![
3219                DataValue::Integer(1),
3220                DataValue::String("Pending".to_string()),
3221                DataValue::Float(99.99),
3222            ]))
3223            .unwrap();
3224
3225        table
3226            .add_row(DataRow::new(vec![
3227                DataValue::Integer(2),
3228                DataValue::String("Confirmed".to_string()),
3229                DataValue::Float(150.50),
3230            ]))
3231            .unwrap();
3232
3233        table
3234            .add_row(DataRow::new(vec![
3235                DataValue::Integer(3),
3236                DataValue::String("Pending".to_string()),
3237                DataValue::Float(75.00),
3238            ]))
3239            .unwrap();
3240
3241        let table = Arc::new(table);
3242        let engine = QueryEngine::new();
3243
3244        println!("\n=== Testing WHERE clause with Contains ===");
3245        println!("Table has {} rows", table.row_count());
3246        for i in 0..table.row_count() {
3247            let status = table.get_value(i, 1);
3248            println!("Row {i}: status = {status:?}");
3249        }
3250
3251        // Test 1: Basic string contains (should work)
3252        println!("\n--- Test 1: status.Contains('pend') ---");
3253        let result = engine.execute(
3254            table.clone(),
3255            "SELECT * FROM test WHERE status.Contains('pend')",
3256        );
3257        match result {
3258            Ok(view) => {
3259                println!("SUCCESS: Found {} matching rows", view.row_count());
3260                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3261            }
3262            Err(e) => {
3263                panic!("Query failed: {e}");
3264            }
3265        }
3266
3267        // Test 2: Numeric contains (should work with type coercion)
3268        println!("\n--- Test 2: price.Contains('9') ---");
3269        let result = engine.execute(
3270            table.clone(),
3271            "SELECT * FROM test WHERE price.Contains('9')",
3272        );
3273        match result {
3274            Ok(view) => {
3275                println!(
3276                    "SUCCESS: Found {} matching rows with price containing '9'",
3277                    view.row_count()
3278                );
3279                // Should find 99.99 row
3280                assert!(view.row_count() >= 1);
3281            }
3282            Err(e) => {
3283                panic!("Numeric coercion query failed: {e}");
3284            }
3285        }
3286
3287        println!("\n=== All tests passed! ===");
3288    }
3289
3290    #[test]
3291    fn test_not_in_clause() {
3292        // Initialize tracing for debug output
3293        let _ = tracing_subscriber::fmt()
3294            .with_max_level(tracing::Level::DEBUG)
3295            .try_init();
3296
3297        let mut table = DataTable::new("test");
3298        table.add_column(DataColumn::new("id"));
3299        table.add_column(DataColumn::new("country"));
3300
3301        // Add test data
3302        table
3303            .add_row(DataRow::new(vec![
3304                DataValue::Integer(1),
3305                DataValue::String("CA".to_string()),
3306            ]))
3307            .unwrap();
3308
3309        table
3310            .add_row(DataRow::new(vec![
3311                DataValue::Integer(2),
3312                DataValue::String("US".to_string()),
3313            ]))
3314            .unwrap();
3315
3316        table
3317            .add_row(DataRow::new(vec![
3318                DataValue::Integer(3),
3319                DataValue::String("UK".to_string()),
3320            ]))
3321            .unwrap();
3322
3323        let table = Arc::new(table);
3324        let engine = QueryEngine::new();
3325
3326        println!("\n=== Testing NOT IN clause ===");
3327        println!("Table has {} rows", table.row_count());
3328        for i in 0..table.row_count() {
3329            let country = table.get_value(i, 1);
3330            println!("Row {i}: country = {country:?}");
3331        }
3332
3333        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3334        println!("\n--- Test: country NOT IN ('CA') ---");
3335        let result = engine.execute(
3336            table.clone(),
3337            "SELECT * FROM test WHERE country NOT IN ('CA')",
3338        );
3339        match result {
3340            Ok(view) => {
3341                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3342                assert_eq!(view.row_count(), 2); // Should find US and UK
3343            }
3344            Err(e) => {
3345                panic!("NOT IN query failed: {e}");
3346            }
3347        }
3348
3349        println!("\n=== NOT IN test complete! ===");
3350    }
3351
3352    #[test]
3353    fn test_case_insensitive_in_and_not_in() {
3354        // Initialize tracing for debug output
3355        let _ = tracing_subscriber::fmt()
3356            .with_max_level(tracing::Level::DEBUG)
3357            .try_init();
3358
3359        let mut table = DataTable::new("test");
3360        table.add_column(DataColumn::new("id"));
3361        table.add_column(DataColumn::new("country"));
3362
3363        // Add test data with mixed case
3364        table
3365            .add_row(DataRow::new(vec![
3366                DataValue::Integer(1),
3367                DataValue::String("CA".to_string()), // uppercase
3368            ]))
3369            .unwrap();
3370
3371        table
3372            .add_row(DataRow::new(vec![
3373                DataValue::Integer(2),
3374                DataValue::String("us".to_string()), // lowercase
3375            ]))
3376            .unwrap();
3377
3378        table
3379            .add_row(DataRow::new(vec![
3380                DataValue::Integer(3),
3381                DataValue::String("UK".to_string()), // uppercase
3382            ]))
3383            .unwrap();
3384
3385        let table = Arc::new(table);
3386
3387        println!("\n=== Testing Case-Insensitive IN clause ===");
3388        println!("Table has {} rows", table.row_count());
3389        for i in 0..table.row_count() {
3390            let country = table.get_value(i, 1);
3391            println!("Row {i}: country = {country:?}");
3392        }
3393
3394        // Test case-insensitive IN - should match 'CA' with 'ca'
3395        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3396        let engine = QueryEngine::with_case_insensitive(true);
3397        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3398        match result {
3399            Ok(view) => {
3400                println!(
3401                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3402                    view.row_count()
3403                );
3404                assert_eq!(view.row_count(), 1); // Should find CA row
3405            }
3406            Err(e) => {
3407                panic!("Case-insensitive IN query failed: {e}");
3408            }
3409        }
3410
3411        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3412        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3413        let result = engine.execute(
3414            table.clone(),
3415            "SELECT * FROM test WHERE country NOT IN ('ca')",
3416        );
3417        match result {
3418            Ok(view) => {
3419                println!(
3420                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3421                    view.row_count()
3422                );
3423                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3424            }
3425            Err(e) => {
3426                panic!("Case-insensitive NOT IN query failed: {e}");
3427            }
3428        }
3429
3430        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3431        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3432        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3433        let result = engine_case_sensitive
3434            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3435        match result {
3436            Ok(view) => {
3437                println!(
3438                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3439                    view.row_count()
3440                );
3441                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3442            }
3443            Err(e) => {
3444                panic!("Case-sensitive IN query failed: {e}");
3445            }
3446        }
3447
3448        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3449    }
3450
3451    #[test]
3452    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3453    fn test_parentheses_in_where_clause() {
3454        // Initialize tracing for debug output
3455        let _ = tracing_subscriber::fmt()
3456            .with_max_level(tracing::Level::DEBUG)
3457            .try_init();
3458
3459        let mut table = DataTable::new("test");
3460        table.add_column(DataColumn::new("id"));
3461        table.add_column(DataColumn::new("status"));
3462        table.add_column(DataColumn::new("priority"));
3463
3464        // Add test data
3465        table
3466            .add_row(DataRow::new(vec![
3467                DataValue::Integer(1),
3468                DataValue::String("Pending".to_string()),
3469                DataValue::String("High".to_string()),
3470            ]))
3471            .unwrap();
3472
3473        table
3474            .add_row(DataRow::new(vec![
3475                DataValue::Integer(2),
3476                DataValue::String("Complete".to_string()),
3477                DataValue::String("High".to_string()),
3478            ]))
3479            .unwrap();
3480
3481        table
3482            .add_row(DataRow::new(vec![
3483                DataValue::Integer(3),
3484                DataValue::String("Pending".to_string()),
3485                DataValue::String("Low".to_string()),
3486            ]))
3487            .unwrap();
3488
3489        table
3490            .add_row(DataRow::new(vec![
3491                DataValue::Integer(4),
3492                DataValue::String("Complete".to_string()),
3493                DataValue::String("Low".to_string()),
3494            ]))
3495            .unwrap();
3496
3497        let table = Arc::new(table);
3498        let engine = QueryEngine::new();
3499
3500        println!("\n=== Testing Parentheses in WHERE clause ===");
3501        println!("Table has {} rows", table.row_count());
3502        for i in 0..table.row_count() {
3503            let status = table.get_value(i, 1);
3504            let priority = table.get_value(i, 2);
3505            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3506        }
3507
3508        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3509        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3510        let result = engine.execute(
3511            table.clone(),
3512            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3513        );
3514        match result {
3515            Ok(view) => {
3516                println!(
3517                    "SUCCESS: Found {} rows with parenthetical logic",
3518                    view.row_count()
3519                );
3520                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3521            }
3522            Err(e) => {
3523                panic!("Parentheses query failed: {e}");
3524            }
3525        }
3526
3527        println!("\n=== Parentheses test complete! ===");
3528    }
3529
3530    #[test]
3531    #[ignore = "Numeric type coercion needs fixing"]
3532    fn test_numeric_type_coercion() {
3533        // Initialize tracing for debug output
3534        let _ = tracing_subscriber::fmt()
3535            .with_max_level(tracing::Level::DEBUG)
3536            .try_init();
3537
3538        let mut table = DataTable::new("test");
3539        table.add_column(DataColumn::new("id"));
3540        table.add_column(DataColumn::new("price"));
3541        table.add_column(DataColumn::new("quantity"));
3542
3543        // Add test data with different numeric types
3544        table
3545            .add_row(DataRow::new(vec![
3546                DataValue::Integer(1),
3547                DataValue::Float(99.50), // Contains '.'
3548                DataValue::Integer(100),
3549            ]))
3550            .unwrap();
3551
3552        table
3553            .add_row(DataRow::new(vec![
3554                DataValue::Integer(2),
3555                DataValue::Float(150.0), // Contains '.' and '0'
3556                DataValue::Integer(200),
3557            ]))
3558            .unwrap();
3559
3560        table
3561            .add_row(DataRow::new(vec![
3562                DataValue::Integer(3),
3563                DataValue::Integer(75), // No decimal point
3564                DataValue::Integer(50),
3565            ]))
3566            .unwrap();
3567
3568        let table = Arc::new(table);
3569        let engine = QueryEngine::new();
3570
3571        println!("\n=== Testing Numeric Type Coercion ===");
3572        println!("Table has {} rows", table.row_count());
3573        for i in 0..table.row_count() {
3574            let price = table.get_value(i, 1);
3575            let quantity = table.get_value(i, 2);
3576            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3577        }
3578
3579        // Test Contains on float values - should find rows with decimal points
3580        println!("\n--- Test: price.Contains('.') ---");
3581        let result = engine.execute(
3582            table.clone(),
3583            "SELECT * FROM test WHERE price.Contains('.')",
3584        );
3585        match result {
3586            Ok(view) => {
3587                println!(
3588                    "SUCCESS: Found {} rows with decimal points in price",
3589                    view.row_count()
3590                );
3591                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3592            }
3593            Err(e) => {
3594                panic!("Numeric Contains query failed: {e}");
3595            }
3596        }
3597
3598        // Test Contains on integer values converted to string
3599        println!("\n--- Test: quantity.Contains('0') ---");
3600        let result = engine.execute(
3601            table.clone(),
3602            "SELECT * FROM test WHERE quantity.Contains('0')",
3603        );
3604        match result {
3605            Ok(view) => {
3606                println!(
3607                    "SUCCESS: Found {} rows with '0' in quantity",
3608                    view.row_count()
3609                );
3610                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3611            }
3612            Err(e) => {
3613                panic!("Integer Contains query failed: {e}");
3614            }
3615        }
3616
3617        println!("\n=== Numeric type coercion test complete! ===");
3618    }
3619
3620    #[test]
3621    fn test_datetime_comparisons() {
3622        // Initialize tracing for debug output
3623        let _ = tracing_subscriber::fmt()
3624            .with_max_level(tracing::Level::DEBUG)
3625            .try_init();
3626
3627        let mut table = DataTable::new("test");
3628        table.add_column(DataColumn::new("id"));
3629        table.add_column(DataColumn::new("created_date"));
3630
3631        // Add test data with date strings (as they would come from CSV)
3632        table
3633            .add_row(DataRow::new(vec![
3634                DataValue::Integer(1),
3635                DataValue::String("2024-12-15".to_string()),
3636            ]))
3637            .unwrap();
3638
3639        table
3640            .add_row(DataRow::new(vec![
3641                DataValue::Integer(2),
3642                DataValue::String("2025-01-15".to_string()),
3643            ]))
3644            .unwrap();
3645
3646        table
3647            .add_row(DataRow::new(vec![
3648                DataValue::Integer(3),
3649                DataValue::String("2025-02-15".to_string()),
3650            ]))
3651            .unwrap();
3652
3653        let table = Arc::new(table);
3654        let engine = QueryEngine::new();
3655
3656        println!("\n=== Testing DateTime Comparisons ===");
3657        println!("Table has {} rows", table.row_count());
3658        for i in 0..table.row_count() {
3659            let date = table.get_value(i, 1);
3660            println!("Row {i}: created_date = {date:?}");
3661        }
3662
3663        // Test DateTime constructor comparison - should find dates after 2025-01-01
3664        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3665        let result = engine.execute(
3666            table.clone(),
3667            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3668        );
3669        match result {
3670            Ok(view) => {
3671                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3672                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3673            }
3674            Err(e) => {
3675                panic!("DateTime comparison query failed: {e}");
3676            }
3677        }
3678
3679        println!("\n=== DateTime comparison test complete! ===");
3680    }
3681
3682    #[test]
3683    fn test_not_with_method_calls() {
3684        // Initialize tracing for debug output
3685        let _ = tracing_subscriber::fmt()
3686            .with_max_level(tracing::Level::DEBUG)
3687            .try_init();
3688
3689        let mut table = DataTable::new("test");
3690        table.add_column(DataColumn::new("id"));
3691        table.add_column(DataColumn::new("status"));
3692
3693        // Add test data
3694        table
3695            .add_row(DataRow::new(vec![
3696                DataValue::Integer(1),
3697                DataValue::String("Pending Review".to_string()),
3698            ]))
3699            .unwrap();
3700
3701        table
3702            .add_row(DataRow::new(vec![
3703                DataValue::Integer(2),
3704                DataValue::String("Complete".to_string()),
3705            ]))
3706            .unwrap();
3707
3708        table
3709            .add_row(DataRow::new(vec![
3710                DataValue::Integer(3),
3711                DataValue::String("Pending Approval".to_string()),
3712            ]))
3713            .unwrap();
3714
3715        let table = Arc::new(table);
3716        let engine = QueryEngine::with_case_insensitive(true);
3717
3718        println!("\n=== Testing NOT with Method Calls ===");
3719        println!("Table has {} rows", table.row_count());
3720        for i in 0..table.row_count() {
3721            let status = table.get_value(i, 1);
3722            println!("Row {i}: status = {status:?}");
3723        }
3724
3725        // Test NOT with Contains - should exclude rows containing "pend"
3726        println!("\n--- Test: NOT status.Contains('pend') ---");
3727        let result = engine.execute(
3728            table.clone(),
3729            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3730        );
3731        match result {
3732            Ok(view) => {
3733                println!(
3734                    "SUCCESS: Found {} rows NOT containing 'pend'",
3735                    view.row_count()
3736                );
3737                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3738            }
3739            Err(e) => {
3740                panic!("NOT Contains query failed: {e}");
3741            }
3742        }
3743
3744        // Test NOT with StartsWith
3745        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3746        let result = engine.execute(
3747            table.clone(),
3748            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3749        );
3750        match result {
3751            Ok(view) => {
3752                println!(
3753                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3754                    view.row_count()
3755                );
3756                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3757            }
3758            Err(e) => {
3759                panic!("NOT StartsWith query failed: {e}");
3760            }
3761        }
3762
3763        println!("\n=== NOT with method calls test complete! ===");
3764    }
3765
3766    #[test]
3767    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3768    fn test_complex_logical_expressions() {
3769        // Initialize tracing for debug output
3770        let _ = tracing_subscriber::fmt()
3771            .with_max_level(tracing::Level::DEBUG)
3772            .try_init();
3773
3774        let mut table = DataTable::new("test");
3775        table.add_column(DataColumn::new("id"));
3776        table.add_column(DataColumn::new("status"));
3777        table.add_column(DataColumn::new("priority"));
3778        table.add_column(DataColumn::new("assigned"));
3779
3780        // Add comprehensive test data
3781        table
3782            .add_row(DataRow::new(vec![
3783                DataValue::Integer(1),
3784                DataValue::String("Pending".to_string()),
3785                DataValue::String("High".to_string()),
3786                DataValue::String("John".to_string()),
3787            ]))
3788            .unwrap();
3789
3790        table
3791            .add_row(DataRow::new(vec![
3792                DataValue::Integer(2),
3793                DataValue::String("Complete".to_string()),
3794                DataValue::String("High".to_string()),
3795                DataValue::String("Jane".to_string()),
3796            ]))
3797            .unwrap();
3798
3799        table
3800            .add_row(DataRow::new(vec![
3801                DataValue::Integer(3),
3802                DataValue::String("Pending".to_string()),
3803                DataValue::String("Low".to_string()),
3804                DataValue::String("John".to_string()),
3805            ]))
3806            .unwrap();
3807
3808        table
3809            .add_row(DataRow::new(vec![
3810                DataValue::Integer(4),
3811                DataValue::String("In Progress".to_string()),
3812                DataValue::String("Medium".to_string()),
3813                DataValue::String("Jane".to_string()),
3814            ]))
3815            .unwrap();
3816
3817        let table = Arc::new(table);
3818        let engine = QueryEngine::new();
3819
3820        println!("\n=== Testing Complex Logical Expressions ===");
3821        println!("Table has {} rows", table.row_count());
3822        for i in 0..table.row_count() {
3823            let status = table.get_value(i, 1);
3824            let priority = table.get_value(i, 2);
3825            let assigned = table.get_value(i, 3);
3826            println!(
3827                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3828            );
3829        }
3830
3831        // Test complex AND/OR logic
3832        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3833        let result = engine.execute(
3834            table.clone(),
3835            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3836        );
3837        match result {
3838            Ok(view) => {
3839                println!(
3840                    "SUCCESS: Found {} rows with complex logic",
3841                    view.row_count()
3842                );
3843                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3844            }
3845            Err(e) => {
3846                panic!("Complex logic query failed: {e}");
3847            }
3848        }
3849
3850        // Test NOT with complex expressions
3851        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3852        let result = engine.execute(
3853            table.clone(),
3854            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3855        );
3856        match result {
3857            Ok(view) => {
3858                println!(
3859                    "SUCCESS: Found {} rows with NOT complex logic",
3860                    view.row_count()
3861                );
3862                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3863            }
3864            Err(e) => {
3865                panic!("NOT complex logic query failed: {e}");
3866            }
3867        }
3868
3869        println!("\n=== Complex logical expressions test complete! ===");
3870    }
3871
3872    #[test]
3873    fn test_mixed_data_types_and_edge_cases() {
3874        // Initialize tracing for debug output
3875        let _ = tracing_subscriber::fmt()
3876            .with_max_level(tracing::Level::DEBUG)
3877            .try_init();
3878
3879        let mut table = DataTable::new("test");
3880        table.add_column(DataColumn::new("id"));
3881        table.add_column(DataColumn::new("value"));
3882        table.add_column(DataColumn::new("nullable_field"));
3883
3884        // Add test data with mixed types and edge cases
3885        table
3886            .add_row(DataRow::new(vec![
3887                DataValue::Integer(1),
3888                DataValue::String("123.45".to_string()),
3889                DataValue::String("present".to_string()),
3890            ]))
3891            .unwrap();
3892
3893        table
3894            .add_row(DataRow::new(vec![
3895                DataValue::Integer(2),
3896                DataValue::Float(678.90),
3897                DataValue::Null,
3898            ]))
3899            .unwrap();
3900
3901        table
3902            .add_row(DataRow::new(vec![
3903                DataValue::Integer(3),
3904                DataValue::Boolean(true),
3905                DataValue::String("also present".to_string()),
3906            ]))
3907            .unwrap();
3908
3909        table
3910            .add_row(DataRow::new(vec![
3911                DataValue::Integer(4),
3912                DataValue::String("false".to_string()),
3913                DataValue::Null,
3914            ]))
3915            .unwrap();
3916
3917        let table = Arc::new(table);
3918        let engine = QueryEngine::new();
3919
3920        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3921        println!("Table has {} rows", table.row_count());
3922        for i in 0..table.row_count() {
3923            let value = table.get_value(i, 1);
3924            let nullable = table.get_value(i, 2);
3925            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3926        }
3927
3928        // Test type coercion with boolean Contains
3929        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3930        let result = engine.execute(
3931            table.clone(),
3932            "SELECT * FROM test WHERE value.Contains('true')",
3933        );
3934        match result {
3935            Ok(view) => {
3936                println!(
3937                    "SUCCESS: Found {} rows with boolean coercion",
3938                    view.row_count()
3939                );
3940                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3941            }
3942            Err(e) => {
3943                panic!("Boolean coercion query failed: {e}");
3944            }
3945        }
3946
3947        // Test multiple IN values with mixed types
3948        println!("\n--- Test: id IN (1, 3) ---");
3949        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3950        match result {
3951            Ok(view) => {
3952                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3953                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
3954            }
3955            Err(e) => {
3956                panic!("Multiple IN values query failed: {e}");
3957            }
3958        }
3959
3960        println!("\n=== Mixed data types test complete! ===");
3961    }
3962
3963    /// Test that aggregate-only queries return exactly one row (regression test)
3964    #[test]
3965    fn test_aggregate_only_single_row() {
3966        let table = create_test_stock_data();
3967        let engine = QueryEngine::new();
3968
3969        // Test query with multiple aggregates - should return exactly 1 row
3970        let result = engine
3971            .execute(
3972                table.clone(),
3973                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3974            )
3975            .expect("Query should succeed");
3976
3977        assert_eq!(
3978            result.row_count(),
3979            1,
3980            "Aggregate-only query should return exactly 1 row"
3981        );
3982        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3983
3984        // Verify the actual values are correct
3985        let source = result.source();
3986        let row = source.get_row(0).expect("Should have first row");
3987
3988        // COUNT(*) should be 5 (total rows)
3989        assert_eq!(row.values[0], DataValue::Integer(5));
3990
3991        // MIN should be 99.5
3992        assert_eq!(row.values[1], DataValue::Float(99.5));
3993
3994        // MAX should be 105.0
3995        assert_eq!(row.values[2], DataValue::Float(105.0));
3996
3997        // AVG should be approximately 102.4
3998        if let DataValue::Float(avg) = &row.values[3] {
3999            assert!(
4000                (avg - 102.4).abs() < 0.01,
4001                "Average should be approximately 102.4, got {}",
4002                avg
4003            );
4004        } else {
4005            panic!("AVG should return a Float value");
4006        }
4007    }
4008
4009    /// Test single aggregate function returns single row
4010    #[test]
4011    fn test_single_aggregate_single_row() {
4012        let table = create_test_stock_data();
4013        let engine = QueryEngine::new();
4014
4015        let result = engine
4016            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4017            .expect("Query should succeed");
4018
4019        assert_eq!(
4020            result.row_count(),
4021            1,
4022            "Single aggregate query should return exactly 1 row"
4023        );
4024        assert_eq!(result.column_count(), 1, "Should have 1 column");
4025
4026        let source = result.source();
4027        let row = source.get_row(0).expect("Should have first row");
4028        assert_eq!(row.values[0], DataValue::Integer(5));
4029    }
4030
4031    /// Test aggregate with WHERE clause filtering
4032    #[test]
4033    fn test_aggregate_with_where_single_row() {
4034        let table = create_test_stock_data();
4035        let engine = QueryEngine::new();
4036
4037        // Filter to only high-value stocks (>= 103.0) and aggregate
4038        let result = engine
4039            .execute(
4040                table.clone(),
4041                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4042            )
4043            .expect("Query should succeed");
4044
4045        assert_eq!(
4046            result.row_count(),
4047            1,
4048            "Filtered aggregate query should return exactly 1 row"
4049        );
4050        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4051
4052        let source = result.source();
4053        let row = source.get_row(0).expect("Should have first row");
4054
4055        // Should find 2 rows (103.5 and 105.0)
4056        assert_eq!(row.values[0], DataValue::Integer(2));
4057        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
4058        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
4059    }
4060
4061    #[test]
4062    fn test_not_in_parsing() {
4063        use crate::sql::recursive_parser::Parser;
4064
4065        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4066        println!("\n=== Testing NOT IN parsing ===");
4067        println!("Parsing query: {query}");
4068
4069        let mut parser = Parser::new(query);
4070        match parser.parse() {
4071            Ok(statement) => {
4072                println!("Parsed statement: {statement:#?}");
4073                if let Some(where_clause) = statement.where_clause {
4074                    println!("WHERE conditions: {:#?}", where_clause.conditions);
4075                    if let Some(first_condition) = where_clause.conditions.first() {
4076                        println!("First condition expression: {:#?}", first_condition.expr);
4077                    }
4078                }
4079            }
4080            Err(e) => {
4081                panic!("Parse error: {e}");
4082            }
4083        }
4084    }
4085
4086    /// Create test stock data for aggregate testing
4087    fn create_test_stock_data() -> Arc<DataTable> {
4088        let mut table = DataTable::new("stock");
4089
4090        table.add_column(DataColumn::new("symbol"));
4091        table.add_column(DataColumn::new("close"));
4092        table.add_column(DataColumn::new("volume"));
4093
4094        // Add 5 rows of test data
4095        let test_data = vec![
4096            ("AAPL", 99.5, 1000),
4097            ("AAPL", 101.2, 1500),
4098            ("AAPL", 103.5, 2000),
4099            ("AAPL", 105.0, 1200),
4100            ("AAPL", 102.8, 1800),
4101        ];
4102
4103        for (symbol, close, volume) in test_data {
4104            table
4105                .add_row(DataRow::new(vec![
4106                    DataValue::String(symbol.to_string()),
4107                    DataValue::Float(close),
4108                    DataValue::Integer(volume),
4109                ]))
4110                .expect("Should add row successfully");
4111        }
4112
4113        Arc::new(table)
4114    }
4115}
4116
4117#[cfg(test)]
4118#[path = "query_engine_tests.rs"]
4119mod query_engine_tests;