Skip to main content

sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Look up a CTE by name with case-insensitive fallback.
33/// Exact match is tried first (fast path); if that fails, a case-insensitive
34/// scan finds tables like `Orders` when the query references `orders`.
35/// This matches MySQL/PostgreSQL behaviour for unquoted identifiers.
36fn resolve_cte<'a>(
37    context: &'a HashMap<String, Arc<DataView>>,
38    name: &str,
39) -> Option<&'a Arc<DataView>> {
40    if let Some(v) = context.get(name) {
41        return Some(v);
42    }
43    let lower = name.to_lowercase();
44    context
45        .iter()
46        .find(|(k, _)| k.to_lowercase() == lower)
47        .map(|(_, v)| v)
48}
49
50/// Execution context for tracking table aliases and scope during query execution
51#[derive(Debug, Clone)]
52pub struct ExecutionContext {
53    /// Map from alias to actual table/CTE name
54    /// Example: "t" -> "#tmp_trades", "a" -> "data"
55    alias_map: HashMap<String, String>,
56}
57
58impl ExecutionContext {
59    /// Create a new empty execution context
60    pub fn new() -> Self {
61        Self {
62            alias_map: HashMap::new(),
63        }
64    }
65
66    /// Register a table alias
67    pub fn register_alias(&mut self, alias: String, table_name: String) {
68        debug!("Registering alias: {} -> {}", alias, table_name);
69        self.alias_map.insert(alias, table_name);
70    }
71
72    /// Resolve an alias to its actual table name
73    /// Returns the alias itself if not found in the map
74    pub fn resolve_alias(&self, name: &str) -> String {
75        self.alias_map
76            .get(name)
77            .cloned()
78            .unwrap_or_else(|| name.to_string())
79    }
80
81    /// Check if a name is a registered alias
82    pub fn is_alias(&self, name: &str) -> bool {
83        self.alias_map.contains_key(name)
84    }
85
86    /// Get a copy of all registered aliases
87    pub fn get_aliases(&self) -> HashMap<String, String> {
88        self.alias_map.clone()
89    }
90
91    /// Resolve a column reference to its index in the table, handling aliases
92    ///
93    /// This is the unified column resolution function that should be used by all
94    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
95    /// alias resolution behavior.
96    ///
97    /// Resolution strategy:
98    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
99    ///    a. Resolve the alias: t -> actual_table_name
100    ///    b. Try qualified lookup: "actual_table_name.amount"
101    ///    c. Fall back to unqualified: "amount"
102    /// 2. If column_ref has no prefix:
103    ///    a. Try simple column name lookup: "amount"
104    ///    b. Try as qualified name if it contains a dot: "table.column"
105    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
106        if let Some(table_prefix) = &column_ref.table_prefix {
107            // Qualified column reference: resolve the alias first
108            let actual_table = self.resolve_alias(table_prefix);
109
110            // Try qualified lookup: "actual_table.column"
111            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
112            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
113                debug!(
114                    "Resolved {}.{} -> qualified column '{}' at index {}",
115                    table_prefix, column_ref.name, qualified_name, idx
116                );
117                return Ok(idx);
118            }
119
120            // Fall back to unqualified lookup
121            if let Some(idx) = table.get_column_index(&column_ref.name) {
122                debug!(
123                    "Resolved {}.{} -> unqualified column '{}' at index {}",
124                    table_prefix, column_ref.name, column_ref.name, idx
125                );
126                return Ok(idx);
127            }
128
129            // Not found with either qualified or unqualified name
130            Err(anyhow!(
131                "Column '{}' not found. Table '{}' may not support qualified column names",
132                qualified_name,
133                actual_table
134            ))
135        } else {
136            // Unqualified column reference
137            if let Some(idx) = table.get_column_index(&column_ref.name) {
138                debug!(
139                    "Resolved unqualified column '{}' at index {}",
140                    column_ref.name, idx
141                );
142                return Ok(idx);
143            }
144
145            // If the column name contains a dot, try it as a qualified name
146            if column_ref.name.contains('.') {
147                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
148                    debug!(
149                        "Resolved '{}' as qualified column at index {}",
150                        column_ref.name, idx
151                    );
152                    return Ok(idx);
153                }
154            }
155
156            // Column not found - provide helpful error
157            let suggestion = self.find_similar_column(table, &column_ref.name);
158            match suggestion {
159                Some(similar) => Err(anyhow!(
160                    "Column '{}' not found. Did you mean '{}'?",
161                    column_ref.name,
162                    similar
163                )),
164                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
165            }
166        }
167    }
168
169    /// Find a similar column name using edit distance (for better error messages)
170    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
171        let columns = table.column_names();
172        let mut best_match: Option<(String, usize)> = None;
173
174        for col in columns {
175            let distance = edit_distance(name, &col);
176            if distance <= 2 {
177                // Allow up to 2 character differences
178                match best_match {
179                    Some((_, best_dist)) if distance < best_dist => {
180                        best_match = Some((col.clone(), distance));
181                    }
182                    None => {
183                        best_match = Some((col.clone(), distance));
184                    }
185                    _ => {}
186                }
187            }
188        }
189
190        best_match.map(|(name, _)| name)
191    }
192}
193
194impl Default for ExecutionContext {
195    fn default() -> Self {
196        Self::new()
197    }
198}
199
200/// Calculate edit distance between two strings (Levenshtein distance)
201fn edit_distance(a: &str, b: &str) -> usize {
202    let len_a = a.chars().count();
203    let len_b = b.chars().count();
204
205    if len_a == 0 {
206        return len_b;
207    }
208    if len_b == 0 {
209        return len_a;
210    }
211
212    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
213
214    for i in 0..=len_a {
215        matrix[i][0] = i;
216    }
217    for j in 0..=len_b {
218        matrix[0][j] = j;
219    }
220
221    let a_chars: Vec<char> = a.chars().collect();
222    let b_chars: Vec<char> = b.chars().collect();
223
224    for i in 1..=len_a {
225        for j in 1..=len_b {
226            let cost = if a_chars[i - 1] == b_chars[j - 1] {
227                0
228            } else {
229                1
230            };
231            matrix[i][j] = min(
232                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
233                matrix[i - 1][j - 1] + cost,
234            );
235        }
236    }
237
238    matrix[len_a][len_b]
239}
240
241/// Query engine that executes SQL directly on `DataTable`
242#[derive(Clone)]
243pub struct QueryEngine {
244    case_insensitive: bool,
245    date_notation: String,
246    _behavior_config: Option<BehaviorConfig>,
247}
248
249impl Default for QueryEngine {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255impl QueryEngine {
256    #[must_use]
257    pub fn new() -> Self {
258        Self {
259            case_insensitive: false,
260            date_notation: get_date_notation(),
261            _behavior_config: None,
262        }
263    }
264
265    #[must_use]
266    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
267        let case_insensitive = config.case_insensitive_default;
268        // Use get_date_notation() to respect environment variable override
269        let date_notation = get_date_notation();
270        Self {
271            case_insensitive,
272            date_notation,
273            _behavior_config: Some(config),
274        }
275    }
276
277    #[must_use]
278    pub fn with_date_notation(_date_notation: String) -> Self {
279        Self {
280            case_insensitive: false,
281            date_notation: get_date_notation(), // Always use the global function
282            _behavior_config: None,
283        }
284    }
285
286    #[must_use]
287    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
288        Self {
289            case_insensitive,
290            date_notation: get_date_notation(),
291            _behavior_config: None,
292        }
293    }
294
295    #[must_use]
296    pub fn with_case_insensitive_and_date_notation(
297        case_insensitive: bool,
298        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
299    ) -> Self {
300        Self {
301            case_insensitive,
302            date_notation: get_date_notation(), // Always use the global function
303            _behavior_config: None,
304        }
305    }
306
307    /// Find a column name similar to the given name using edit distance
308    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
309        let columns = table.column_names();
310        let mut best_match: Option<(String, usize)> = None;
311
312        for col in columns {
313            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
314            // Only suggest if distance is small (likely a typo)
315            // Allow up to 3 edits for longer names
316            let max_distance = if name.len() > 10 { 3 } else { 2 };
317            if distance <= max_distance {
318                match &best_match {
319                    None => best_match = Some((col, distance)),
320                    Some((_, best_dist)) if distance < *best_dist => {
321                        best_match = Some((col, distance));
322                    }
323                    _ => {}
324                }
325            }
326        }
327
328        best_match.map(|(name, _)| name)
329    }
330
331    /// Calculate Levenshtein edit distance between two strings
332    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
333        let len1 = s1.len();
334        let len2 = s2.len();
335        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
336
337        for i in 0..=len1 {
338            matrix[i][0] = i;
339        }
340        for j in 0..=len2 {
341            matrix[0][j] = j;
342        }
343
344        for (i, c1) in s1.chars().enumerate() {
345            for (j, c2) in s2.chars().enumerate() {
346                let cost = usize::from(c1 != c2);
347                matrix[i + 1][j + 1] = std::cmp::min(
348                    matrix[i][j + 1] + 1, // deletion
349                    std::cmp::min(
350                        matrix[i + 1][j] + 1, // insertion
351                        matrix[i][j] + cost,  // substitution
352                    ),
353                );
354            }
355        }
356
357        matrix[len1][len2]
358    }
359
360    /// Check if an expression contains UNNEST function call
361    fn contains_unnest(expr: &SqlExpression) -> bool {
362        match expr {
363            // Direct UNNEST variant
364            SqlExpression::Unnest { .. } => true,
365            SqlExpression::FunctionCall { name, args, .. } => {
366                if name.to_uppercase() == "UNNEST" {
367                    return true;
368                }
369                // Check recursively in function arguments
370                args.iter().any(Self::contains_unnest)
371            }
372            SqlExpression::BinaryOp { left, right, .. } => {
373                Self::contains_unnest(left) || Self::contains_unnest(right)
374            }
375            SqlExpression::Not { expr } => Self::contains_unnest(expr),
376            SqlExpression::CaseExpression {
377                when_branches,
378                else_branch,
379            } => {
380                when_branches.iter().any(|branch| {
381                    Self::contains_unnest(&branch.condition)
382                        || Self::contains_unnest(&branch.result)
383                }) || else_branch
384                    .as_ref()
385                    .map_or(false, |e| Self::contains_unnest(e))
386            }
387            SqlExpression::SimpleCaseExpression {
388                expr,
389                when_branches,
390                else_branch,
391            } => {
392                Self::contains_unnest(expr)
393                    || when_branches.iter().any(|branch| {
394                        Self::contains_unnest(&branch.value)
395                            || Self::contains_unnest(&branch.result)
396                    })
397                    || else_branch
398                        .as_ref()
399                        .map_or(false, |e| Self::contains_unnest(e))
400            }
401            SqlExpression::InList { expr, values } => {
402                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
403            }
404            SqlExpression::NotInList { expr, values } => {
405                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
406            }
407            SqlExpression::Between { expr, lower, upper } => {
408                Self::contains_unnest(expr)
409                    || Self::contains_unnest(lower)
410                    || Self::contains_unnest(upper)
411            }
412            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
413            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
414            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
415            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
416            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
417            SqlExpression::ChainedMethodCall { base, args, .. } => {
418                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
419            }
420            _ => false,
421        }
422    }
423
424    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
425    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
426        match expr {
427            SqlExpression::WindowFunction {
428                window_spec, args, ..
429            } => {
430                // Add this window spec
431                specs.push(window_spec.clone());
432                // Recursively check arguments
433                for arg in args {
434                    Self::collect_window_specs(arg, specs);
435                }
436            }
437            SqlExpression::BinaryOp { left, right, .. } => {
438                Self::collect_window_specs(left, specs);
439                Self::collect_window_specs(right, specs);
440            }
441            SqlExpression::Not { expr } => {
442                Self::collect_window_specs(expr, specs);
443            }
444            SqlExpression::FunctionCall { args, .. } => {
445                for arg in args {
446                    Self::collect_window_specs(arg, specs);
447                }
448            }
449            SqlExpression::CaseExpression {
450                when_branches,
451                else_branch,
452            } => {
453                for branch in when_branches {
454                    Self::collect_window_specs(&branch.condition, specs);
455                    Self::collect_window_specs(&branch.result, specs);
456                }
457                if let Some(else_expr) = else_branch {
458                    Self::collect_window_specs(else_expr, specs);
459                }
460            }
461            SqlExpression::SimpleCaseExpression {
462                expr,
463                when_branches,
464                else_branch,
465            } => {
466                Self::collect_window_specs(expr, specs);
467                for branch in when_branches {
468                    Self::collect_window_specs(&branch.value, specs);
469                    Self::collect_window_specs(&branch.result, specs);
470                }
471                if let Some(else_expr) = else_branch {
472                    Self::collect_window_specs(else_expr, specs);
473                }
474            }
475            SqlExpression::InList { expr, values, .. } => {
476                Self::collect_window_specs(expr, specs);
477                for item in values {
478                    Self::collect_window_specs(item, specs);
479                }
480            }
481            SqlExpression::ChainedMethodCall { base, args, .. } => {
482                Self::collect_window_specs(base, specs);
483                for arg in args {
484                    Self::collect_window_specs(arg, specs);
485                }
486            }
487            // Leaf nodes - no recursion needed
488            SqlExpression::Column(_)
489            | SqlExpression::NumberLiteral(_)
490            | SqlExpression::StringLiteral(_)
491            | SqlExpression::BooleanLiteral(_)
492            | SqlExpression::Null
493            | SqlExpression::DateTimeToday { .. }
494            | SqlExpression::DateTimeConstructor { .. }
495            | SqlExpression::MethodCall { .. } => {}
496            // Catch-all for any other variants
497            _ => {}
498        }
499    }
500
501    /// Check if an expression contains a window function
502    fn contains_window_function(expr: &SqlExpression) -> bool {
503        match expr {
504            SqlExpression::WindowFunction { .. } => true,
505            SqlExpression::BinaryOp { left, right, .. } => {
506                Self::contains_window_function(left) || Self::contains_window_function(right)
507            }
508            SqlExpression::Not { expr } => Self::contains_window_function(expr),
509            SqlExpression::FunctionCall { args, .. } => {
510                args.iter().any(Self::contains_window_function)
511            }
512            SqlExpression::CaseExpression {
513                when_branches,
514                else_branch,
515            } => {
516                when_branches.iter().any(|branch| {
517                    Self::contains_window_function(&branch.condition)
518                        || Self::contains_window_function(&branch.result)
519                }) || else_branch
520                    .as_ref()
521                    .map_or(false, |e| Self::contains_window_function(e))
522            }
523            SqlExpression::SimpleCaseExpression {
524                expr,
525                when_branches,
526                else_branch,
527            } => {
528                Self::contains_window_function(expr)
529                    || when_branches.iter().any(|branch| {
530                        Self::contains_window_function(&branch.value)
531                            || Self::contains_window_function(&branch.result)
532                    })
533                    || else_branch
534                        .as_ref()
535                        .map_or(false, |e| Self::contains_window_function(e))
536            }
537            SqlExpression::InList { expr, values } => {
538                Self::contains_window_function(expr)
539                    || values.iter().any(Self::contains_window_function)
540            }
541            SqlExpression::NotInList { expr, values } => {
542                Self::contains_window_function(expr)
543                    || values.iter().any(Self::contains_window_function)
544            }
545            SqlExpression::Between { expr, lower, upper } => {
546                Self::contains_window_function(expr)
547                    || Self::contains_window_function(lower)
548                    || Self::contains_window_function(upper)
549            }
550            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
551            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
552            SqlExpression::MethodCall { args, .. } => {
553                args.iter().any(Self::contains_window_function)
554            }
555            SqlExpression::ChainedMethodCall { base, args, .. } => {
556                Self::contains_window_function(base)
557                    || args.iter().any(Self::contains_window_function)
558            }
559            _ => false,
560        }
561    }
562
563    /// Extract all window function specifications from select items
564    fn extract_window_specs(
565        items: &[SelectItem],
566    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
567        let mut specs = Vec::new();
568        for (idx, item) in items.iter().enumerate() {
569            if let SelectItem::Expression { expr, .. } = item {
570                Self::collect_window_function_specs(expr, idx, &mut specs);
571            }
572        }
573        specs
574    }
575
576    /// Recursively collect window function specs from an expression
577    fn collect_window_function_specs(
578        expr: &SqlExpression,
579        output_column_index: usize,
580        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
581    ) {
582        match expr {
583            SqlExpression::WindowFunction {
584                name,
585                args,
586                window_spec,
587            } => {
588                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
589                    spec: window_spec.clone(),
590                    function_name: name.clone(),
591                    args: args.clone(),
592                    output_column_index,
593                });
594            }
595            SqlExpression::BinaryOp { left, right, .. } => {
596                Self::collect_window_function_specs(left, output_column_index, specs);
597                Self::collect_window_function_specs(right, output_column_index, specs);
598            }
599            SqlExpression::Not { expr } => {
600                Self::collect_window_function_specs(expr, output_column_index, specs);
601            }
602            SqlExpression::FunctionCall { args, .. } => {
603                for arg in args {
604                    Self::collect_window_function_specs(arg, output_column_index, specs);
605                }
606            }
607            SqlExpression::CaseExpression {
608                when_branches,
609                else_branch,
610            } => {
611                for branch in when_branches {
612                    Self::collect_window_function_specs(
613                        &branch.condition,
614                        output_column_index,
615                        specs,
616                    );
617                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
618                }
619                if let Some(e) = else_branch {
620                    Self::collect_window_function_specs(e, output_column_index, specs);
621                }
622            }
623            SqlExpression::SimpleCaseExpression {
624                expr,
625                when_branches,
626                else_branch,
627            } => {
628                Self::collect_window_function_specs(expr, output_column_index, specs);
629                for branch in when_branches {
630                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
631                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
632                }
633                if let Some(e) = else_branch {
634                    Self::collect_window_function_specs(e, output_column_index, specs);
635                }
636            }
637            SqlExpression::InList { expr, values } => {
638                Self::collect_window_function_specs(expr, output_column_index, specs);
639                for val in values {
640                    Self::collect_window_function_specs(val, output_column_index, specs);
641                }
642            }
643            SqlExpression::NotInList { expr, values } => {
644                Self::collect_window_function_specs(expr, output_column_index, specs);
645                for val in values {
646                    Self::collect_window_function_specs(val, output_column_index, specs);
647                }
648            }
649            SqlExpression::Between { expr, lower, upper } => {
650                Self::collect_window_function_specs(expr, output_column_index, specs);
651                Self::collect_window_function_specs(lower, output_column_index, specs);
652                Self::collect_window_function_specs(upper, output_column_index, specs);
653            }
654            SqlExpression::InSubquery { expr, .. } => {
655                Self::collect_window_function_specs(expr, output_column_index, specs);
656            }
657            SqlExpression::NotInSubquery { expr, .. } => {
658                Self::collect_window_function_specs(expr, output_column_index, specs);
659            }
660            SqlExpression::MethodCall { args, .. } => {
661                for arg in args {
662                    Self::collect_window_function_specs(arg, output_column_index, specs);
663                }
664            }
665            SqlExpression::ChainedMethodCall { base, args, .. } => {
666                Self::collect_window_function_specs(base, output_column_index, specs);
667                for arg in args {
668                    Self::collect_window_function_specs(arg, output_column_index, specs);
669                }
670            }
671            _ => {} // Other expression types don't contain window functions
672        }
673    }
674
675    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
676    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
677        let (view, _plan) = self.execute_with_plan(table, sql)?;
678        Ok(view)
679    }
680
681    /// Execute a SQL query with optional temp table registry access
682    pub fn execute_with_temp_tables(
683        &self,
684        table: Arc<DataTable>,
685        sql: &str,
686        temp_tables: Option<&TempTableRegistry>,
687    ) -> Result<DataView> {
688        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
689        Ok(view)
690    }
691
692    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
693    pub fn execute_statement(
694        &self,
695        table: Arc<DataTable>,
696        statement: SelectStatement,
697    ) -> Result<DataView> {
698        self.execute_statement_with_temp_tables(table, statement, None)
699    }
700
701    /// Execute a parsed SelectStatement with optional temp table access
702    pub fn execute_statement_with_temp_tables(
703        &self,
704        table: Arc<DataTable>,
705        statement: SelectStatement,
706        temp_tables: Option<&TempTableRegistry>,
707    ) -> Result<DataView> {
708        // First process CTEs to build context
709        let mut cte_context = HashMap::new();
710
711        // Add temp tables to CTE context if provided
712        if let Some(temp_registry) = temp_tables {
713            for table_name in temp_registry.list_tables() {
714                if let Some(temp_table) = temp_registry.get(&table_name) {
715                    debug!("Adding temp table {} to CTE context", table_name);
716                    let view = DataView::new(temp_table);
717                    cte_context.insert(table_name, Arc::new(view));
718                }
719            }
720        }
721
722        for cte in &statement.ctes {
723            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
724            // Execute the CTE based on its type
725            let cte_result = match &cte.cte_type {
726                CTEType::Standard(query) => {
727                    // Execute the CTE query (it might reference earlier CTEs)
728                    let view = self.build_view_with_context(
729                        table.clone(),
730                        query.clone(),
731                        &mut cte_context,
732                    )?;
733
734                    // Materialize the view and enrich columns with qualified names
735                    let mut materialized = self.materialize_view(view)?;
736
737                    // Enrich columns with qualified names for proper scoping
738                    for column in materialized.columns_mut() {
739                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
740                        column.source_table = Some(cte.name.clone());
741                    }
742
743                    DataView::new(Arc::new(materialized))
744                }
745                CTEType::Web(web_spec) => {
746                    // Fetch data from URL
747                    use crate::web::http_fetcher::WebDataFetcher;
748
749                    let fetcher = WebDataFetcher::new()?;
750                    // Pass None for query context (no full SQL available in these contexts)
751                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
752
753                    // Enrich columns with qualified names for proper scoping
754                    for column in data_table.columns_mut() {
755                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
756                        column.source_table = Some(cte.name.clone());
757                    }
758
759                    // Convert DataTable to DataView
760                    DataView::new(Arc::new(data_table))
761                }
762                CTEType::File(file_spec) => {
763                    let mut data_table =
764                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
765
766                    for column in data_table.columns_mut() {
767                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
768                        column.source_table = Some(cte.name.clone());
769                    }
770
771                    DataView::new(Arc::new(data_table))
772                }
773            };
774            // Store the result in the context for later use
775            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
776            debug!(
777                "QueryEngine: CTE '{}' pre-processed, stored in context",
778                cte.name
779            );
780        }
781
782        // Now process subqueries with CTE context available
783        let mut subquery_executor =
784            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
785        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
786
787        // Build the view with the same CTE context
788        self.build_view_with_context(table, processed_statement, &mut cte_context)
789    }
790
791    /// Execute a statement with provided CTE context (for subqueries)
792    pub fn execute_statement_with_cte_context(
793        &self,
794        table: Arc<DataTable>,
795        statement: SelectStatement,
796        cte_context: &HashMap<String, Arc<DataView>>,
797    ) -> Result<DataView> {
798        // Clone the context so we can add any CTEs from this statement
799        let mut local_context = cte_context.clone();
800
801        // Process any CTEs in this statement (they might be nested)
802        for cte in &statement.ctes {
803            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
804            let cte_result = match &cte.cte_type {
805                CTEType::Standard(query) => {
806                    let view = self.build_view_with_context(
807                        table.clone(),
808                        query.clone(),
809                        &mut local_context,
810                    )?;
811
812                    // Materialize the view and enrich columns with qualified names
813                    let mut materialized = self.materialize_view(view)?;
814
815                    // Enrich columns with qualified names for proper scoping
816                    for column in materialized.columns_mut() {
817                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
818                        column.source_table = Some(cte.name.clone());
819                    }
820
821                    DataView::new(Arc::new(materialized))
822                }
823                CTEType::Web(web_spec) => {
824                    // Fetch data from URL
825                    use crate::web::http_fetcher::WebDataFetcher;
826
827                    let fetcher = WebDataFetcher::new()?;
828                    // Pass None for query context (no full SQL available in these contexts)
829                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
830
831                    // Enrich columns with qualified names for proper scoping
832                    for column in data_table.columns_mut() {
833                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
834                        column.source_table = Some(cte.name.clone());
835                    }
836
837                    // Convert DataTable to DataView
838                    DataView::new(Arc::new(data_table))
839                }
840                CTEType::File(file_spec) => {
841                    let mut data_table =
842                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
843
844                    for column in data_table.columns_mut() {
845                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
846                        column.source_table = Some(cte.name.clone());
847                    }
848
849                    DataView::new(Arc::new(data_table))
850                }
851            };
852            local_context.insert(cte.name.clone(), Arc::new(cte_result));
853        }
854
855        // Process subqueries with the complete context
856        let mut subquery_executor =
857            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
858        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
859
860        // Build the view
861        self.build_view_with_context(table, processed_statement, &mut local_context)
862    }
863
864    /// Execute a query and return both the result and the execution plan
865    pub fn execute_with_plan(
866        &self,
867        table: Arc<DataTable>,
868        sql: &str,
869    ) -> Result<(DataView, ExecutionPlan)> {
870        self.execute_with_plan_and_temp_tables(table, sql, None)
871    }
872
873    /// Execute a query with temp tables and return both the result and the execution plan
874    pub fn execute_with_plan_and_temp_tables(
875        &self,
876        table: Arc<DataTable>,
877        sql: &str,
878        temp_tables: Option<&TempTableRegistry>,
879    ) -> Result<(DataView, ExecutionPlan)> {
880        let mut plan_builder = ExecutionPlanBuilder::new();
881        let start_time = Instant::now();
882
883        // Parse the SQL query
884        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
885        plan_builder.add_detail(format!("Query: {}", sql));
886        let mut parser = Parser::new(sql);
887        let statement = parser
888            .parse()
889            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
890        plan_builder.add_detail(format!("Parsed successfully"));
891        if let Some(ref from_source) = statement.from_source {
892            match from_source {
893                TableSource::Table(name) => {
894                    plan_builder.add_detail(format!("FROM: {}", name));
895                }
896                TableSource::DerivedTable { alias, .. } => {
897                    plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
898                }
899                TableSource::Pivot { .. } => {
900                    plan_builder.add_detail("FROM: PIVOT".to_string());
901                }
902            }
903        }
904        if statement.where_clause.is_some() {
905            plan_builder.add_detail("WHERE clause present".to_string());
906        }
907        plan_builder.end_step();
908
909        // First process CTEs to build context
910        let mut cte_context = HashMap::new();
911
912        // Add temp tables to CTE context if provided
913        if let Some(temp_registry) = temp_tables {
914            for table_name in temp_registry.list_tables() {
915                if let Some(temp_table) = temp_registry.get(&table_name) {
916                    debug!("Adding temp table {} to CTE context", table_name);
917                    let view = DataView::new(temp_table);
918                    cte_context.insert(table_name, Arc::new(view));
919                }
920            }
921        }
922
923        if !statement.ctes.is_empty() {
924            plan_builder.begin_step(
925                StepType::CTE,
926                format!("Process {} CTEs", statement.ctes.len()),
927            );
928
929            for cte in &statement.ctes {
930                let cte_start = Instant::now();
931                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
932
933                let cte_result = match &cte.cte_type {
934                    CTEType::Standard(query) => {
935                        // Add CTE query details
936                        if let Some(ref from_source) = query.from_source {
937                            match from_source {
938                                TableSource::Table(name) => {
939                                    plan_builder.add_detail(format!("Source: {}", name));
940                                }
941                                TableSource::DerivedTable { alias, .. } => {
942                                    plan_builder
943                                        .add_detail(format!("Source: derived table ({})", alias));
944                                }
945                                TableSource::Pivot { .. } => {
946                                    plan_builder.add_detail("Source: PIVOT".to_string());
947                                }
948                            }
949                        }
950                        if query.where_clause.is_some() {
951                            plan_builder.add_detail("Has WHERE clause".to_string());
952                        }
953                        if query.group_by.is_some() {
954                            plan_builder.add_detail("Has GROUP BY".to_string());
955                        }
956
957                        debug!(
958                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
959                            cte.name,
960                            cte_context.keys().collect::<Vec<_>>()
961                        );
962
963                        // Process subqueries in the CTE's query FIRST
964                        // This allows the subqueries to see all previously defined CTEs
965                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
966                            self.clone(),
967                            table.clone(),
968                            cte_context.clone(),
969                        );
970                        let processed_query = subquery_executor.execute_subqueries(query)?;
971
972                        let view = self.build_view_with_context(
973                            table.clone(),
974                            processed_query,
975                            &mut cte_context,
976                        )?;
977
978                        // Materialize the view and enrich columns with qualified names
979                        let mut materialized = self.materialize_view(view)?;
980
981                        // Enrich columns with qualified names for proper scoping
982                        for column in materialized.columns_mut() {
983                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
984                            column.source_table = Some(cte.name.clone());
985                        }
986
987                        DataView::new(Arc::new(materialized))
988                    }
989                    CTEType::Web(web_spec) => {
990                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
991                        if let Some(format) = &web_spec.format {
992                            plan_builder.add_detail(format!("Format: {:?}", format));
993                        }
994                        if let Some(cache) = web_spec.cache_seconds {
995                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
996                        }
997
998                        // Fetch data from URL
999                        use crate::web::http_fetcher::WebDataFetcher;
1000
1001                        let fetcher = WebDataFetcher::new()?;
1002                        // Pass None for query context - each WEB CTE is independent
1003                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
1004
1005                        // Enrich columns with qualified names for proper scoping
1006                        for column in data_table.columns_mut() {
1007                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1008                            column.source_table = Some(cte.name.clone());
1009                        }
1010
1011                        // Convert DataTable to DataView
1012                        DataView::new(Arc::new(data_table))
1013                    }
1014                    CTEType::File(file_spec) => {
1015                        plan_builder.add_detail(format!("PATH: {}", file_spec.path));
1016                        if file_spec.recursive {
1017                            plan_builder.add_detail("RECURSIVE".to_string());
1018                        }
1019                        if let Some(ref g) = file_spec.glob {
1020                            plan_builder.add_detail(format!("GLOB: {}", g));
1021                        }
1022                        if let Some(d) = file_spec.max_depth {
1023                            plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1024                        }
1025
1026                        let mut data_table =
1027                            crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1028
1029                        for column in data_table.columns_mut() {
1030                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1031                            column.source_table = Some(cte.name.clone());
1032                        }
1033
1034                        DataView::new(Arc::new(data_table))
1035                    }
1036                };
1037
1038                // Record CTE statistics
1039                plan_builder.set_rows_out(cte_result.row_count());
1040                plan_builder.add_detail(format!(
1041                    "Result: {} rows, {} columns",
1042                    cte_result.row_count(),
1043                    cte_result.column_count()
1044                ));
1045                plan_builder.add_detail(format!(
1046                    "Execution time: {:.3}ms",
1047                    cte_start.elapsed().as_secs_f64() * 1000.0
1048                ));
1049
1050                debug!(
1051                    "QueryEngine: Storing CTE '{}' in context with {} rows",
1052                    cte.name,
1053                    cte_result.row_count()
1054                );
1055                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1056                plan_builder.end_step();
1057            }
1058
1059            plan_builder.add_detail(format!(
1060                "All {} CTEs cached in context",
1061                statement.ctes.len()
1062            ));
1063            plan_builder.end_step();
1064        }
1065
1066        // Process subqueries in the statement with CTE context
1067        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1068        let mut subquery_executor =
1069            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1070
1071        // Check if there are subqueries to process
1072        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1073            // This is a simplified check - in reality we'd need to walk the AST
1074            format!("{:?}", w).contains("Subquery")
1075        });
1076
1077        if has_subqueries {
1078            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1079        }
1080
1081        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1082
1083        if has_subqueries {
1084            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1085        } else {
1086            plan_builder.add_detail("No subqueries to process".to_string());
1087        }
1088
1089        plan_builder.end_step();
1090        let result = self.build_view_with_context_and_plan(
1091            table,
1092            processed_statement,
1093            &mut cte_context,
1094            &mut plan_builder,
1095        )?;
1096
1097        let total_duration = start_time.elapsed();
1098        info!(
1099            "Query execution complete: total={:?}, rows={}",
1100            total_duration,
1101            result.row_count()
1102        );
1103
1104        let plan = plan_builder.build();
1105        Ok((result, plan))
1106    }
1107
1108    /// Build a `DataView` from a parsed SQL statement
1109    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1110        let mut cte_context = HashMap::new();
1111        self.build_view_with_context(table, statement, &mut cte_context)
1112    }
1113
1114    /// Build a DataView from a SelectStatement with CTE context
1115    fn build_view_with_context(
1116        &self,
1117        table: Arc<DataTable>,
1118        statement: SelectStatement,
1119        cte_context: &mut HashMap<String, Arc<DataView>>,
1120    ) -> Result<DataView> {
1121        let mut dummy_plan = ExecutionPlanBuilder::new();
1122        let mut exec_context = ExecutionContext::new();
1123        self.build_view_with_context_and_plan_and_exec(
1124            table,
1125            statement,
1126            cte_context,
1127            &mut dummy_plan,
1128            &mut exec_context,
1129        )
1130    }
1131
1132    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1133    fn build_view_with_context_and_plan(
1134        &self,
1135        table: Arc<DataTable>,
1136        statement: SelectStatement,
1137        cte_context: &mut HashMap<String, Arc<DataView>>,
1138        plan: &mut ExecutionPlanBuilder,
1139    ) -> Result<DataView> {
1140        let mut exec_context = ExecutionContext::new();
1141        self.build_view_with_context_and_plan_and_exec(
1142            table,
1143            statement,
1144            cte_context,
1145            plan,
1146            &mut exec_context,
1147        )
1148    }
1149
1150    /// Build a DataView with CTE context, execution plan, and alias resolution context
1151    fn build_view_with_context_and_plan_and_exec(
1152        &self,
1153        table: Arc<DataTable>,
1154        statement: SelectStatement,
1155        cte_context: &mut HashMap<String, Arc<DataView>>,
1156        plan: &mut ExecutionPlanBuilder,
1157        exec_context: &mut ExecutionContext,
1158    ) -> Result<DataView> {
1159        // First, process any CTEs that aren't already in the context
1160        for cte in &statement.ctes {
1161            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1162            if cte_context.contains_key(&cte.name) {
1163                debug!(
1164                    "QueryEngine: CTE '{}' already in context, skipping",
1165                    cte.name
1166                );
1167                continue;
1168            }
1169
1170            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1171            debug!(
1172                "QueryEngine: Available CTEs for '{}': {:?}",
1173                cte.name,
1174                cte_context.keys().collect::<Vec<_>>()
1175            );
1176
1177            // Execute the CTE query (it might reference earlier CTEs)
1178            let cte_result = match &cte.cte_type {
1179                CTEType::Standard(query) => {
1180                    let view =
1181                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1182
1183                    // Materialize the view and enrich columns with qualified names
1184                    let mut materialized = self.materialize_view(view)?;
1185
1186                    // Enrich columns with qualified names for proper scoping
1187                    for column in materialized.columns_mut() {
1188                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1189                        column.source_table = Some(cte.name.clone());
1190                    }
1191
1192                    DataView::new(Arc::new(materialized))
1193                }
1194                CTEType::Web(_web_spec) => {
1195                    // Web CTEs should have been processed earlier in execute_select
1196                    return Err(anyhow!(
1197                        "Web CTEs should be processed in execute_select method"
1198                    ));
1199                }
1200                CTEType::File(_file_spec) => {
1201                    // FILE CTEs (like WEB) should be processed earlier in execute_select
1202                    return Err(anyhow!(
1203                        "FILE CTEs should be processed in execute_select method"
1204                    ));
1205                }
1206            };
1207
1208            // Store the result in the context for later use
1209            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1210            debug!(
1211                "QueryEngine: CTE '{}' processed, stored in context",
1212                cte.name
1213            );
1214        }
1215
1216        // Determine the source table for the main query
1217        let source_table = if let Some(ref from_source) = statement.from_source {
1218            match from_source {
1219                TableSource::Table(table_name) => {
1220                    // Check if this references a CTE (case-insensitive fallback)
1221                    if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1222                        debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1223                        // Materialize the CTE view as a table
1224                        let mut materialized = self.materialize_view((**cte_view).clone())?;
1225
1226                        // Apply alias to qualified column names if present
1227                        #[allow(deprecated)]
1228                        if let Some(ref alias) = statement.from_alias {
1229                            debug!(
1230                                "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1231                                alias, table_name
1232                            );
1233                            for column in materialized.columns_mut() {
1234                                // Replace the CTE name with the alias in qualified names
1235                                if let Some(ref qualified_name) = column.qualified_name {
1236                                    if qualified_name.starts_with(&format!("{}.", table_name)) {
1237                                        column.qualified_name = Some(qualified_name.replace(
1238                                            &format!("{}.", table_name),
1239                                            &format!("{}.", alias),
1240                                        ));
1241                                    }
1242                                }
1243                                // Update source table to reflect the alias
1244                                if column.source_table.as_ref() == Some(table_name) {
1245                                    column.source_table = Some(alias.clone());
1246                                }
1247                            }
1248                        }
1249
1250                        Arc::new(materialized)
1251                    } else {
1252                        // Regular table reference - use the provided table
1253                        table.clone()
1254                    }
1255                }
1256                TableSource::DerivedTable { query, alias } => {
1257                    // Execute the subquery and use its result as the source
1258                    debug!(
1259                        "QueryEngine: Processing FROM derived table (alias: {})",
1260                        alias
1261                    );
1262                    let subquery_result =
1263                        self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1264
1265                    // Convert the DataView to a DataTable for use as source
1266                    // This materializes the subquery result
1267                    let mut materialized = self.materialize_view(subquery_result)?;
1268
1269                    // Apply the alias to all columns in the derived table
1270                    // Note: We set source_table but keep the column names unqualified
1271                    // so they can be referenced without the table prefix
1272                    for column in materialized.columns_mut() {
1273                        column.source_table = Some(alias.clone());
1274                    }
1275
1276                    Arc::new(materialized)
1277                }
1278                TableSource::Pivot { .. } => {
1279                    // PIVOT should have been expanded by PivotExpander transformer
1280                    return Err(anyhow!(
1281                        "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1282                    ));
1283                }
1284            }
1285        } else {
1286            // Fallback to deprecated fields for backward compatibility
1287            #[allow(deprecated)]
1288            if let Some(ref table_func) = statement.from_function {
1289                // Handle table functions like RANGE()
1290                debug!("QueryEngine: Processing table function (deprecated field)...");
1291                match table_func {
1292                    TableFunction::Generator { name, args } => {
1293                        // Use the generator registry to create the table
1294                        use crate::sql::generators::GeneratorRegistry;
1295
1296                        // Create generator registry (could be cached in QueryEngine)
1297                        let registry = GeneratorRegistry::new();
1298
1299                        if let Some(generator) = registry.get(name) {
1300                            // Evaluate arguments
1301                            let mut evaluator = ArithmeticEvaluator::with_date_notation(
1302                                &table,
1303                                self.date_notation.clone(),
1304                            );
1305                            let dummy_row = 0;
1306
1307                            let mut evaluated_args = Vec::new();
1308                            for arg in args {
1309                                evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1310                            }
1311
1312                            // Generate the table
1313                            generator.generate(evaluated_args)?
1314                        } else {
1315                            return Err(anyhow!("Unknown generator function: {}", name));
1316                        }
1317                    }
1318                }
1319            } else {
1320                #[allow(deprecated)]
1321                if let Some(ref subquery) = statement.from_subquery {
1322                    // Execute the subquery and use its result as the source
1323                    debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1324                    let subquery_result = self.build_view_with_context(
1325                        table.clone(),
1326                        *subquery.clone(),
1327                        cte_context,
1328                    )?;
1329
1330                    // Convert the DataView to a DataTable for use as source
1331                    // This materializes the subquery result
1332                    let materialized = self.materialize_view(subquery_result)?;
1333                    Arc::new(materialized)
1334                } else {
1335                    #[allow(deprecated)]
1336                    if let Some(ref table_name) = statement.from_table {
1337                        // Check if this references a CTE (case-insensitive fallback)
1338                        if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1339                            debug!(
1340                                "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1341                                table_name
1342                            );
1343                            // Materialize the CTE view as a table
1344                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1345
1346                            // Apply alias to qualified column names if present
1347                            #[allow(deprecated)]
1348                            if let Some(ref alias) = statement.from_alias {
1349                                debug!(
1350                                    "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1351                                    alias, table_name
1352                                );
1353                                for column in materialized.columns_mut() {
1354                                    // Replace the CTE name with the alias in qualified names
1355                                    if let Some(ref qualified_name) = column.qualified_name {
1356                                        if qualified_name.starts_with(&format!("{}.", table_name)) {
1357                                            column.qualified_name = Some(qualified_name.replace(
1358                                                &format!("{}.", table_name),
1359                                                &format!("{}.", alias),
1360                                            ));
1361                                        }
1362                                    }
1363                                    // Update source table to reflect the alias
1364                                    if column.source_table.as_ref() == Some(table_name) {
1365                                        column.source_table = Some(alias.clone());
1366                                    }
1367                                }
1368                            }
1369
1370                            Arc::new(materialized)
1371                        } else {
1372                            // Regular table reference - use the provided table
1373                            table.clone()
1374                        }
1375                    } else {
1376                        // No FROM clause - use the provided table
1377                        table.clone()
1378                    }
1379                }
1380            }
1381        };
1382
1383        // Register alias in execution context if present
1384        #[allow(deprecated)]
1385        if let Some(ref alias) = statement.from_alias {
1386            #[allow(deprecated)]
1387            if let Some(ref table_name) = statement.from_table {
1388                exec_context.register_alias(alias.clone(), table_name.clone());
1389            }
1390        }
1391
1392        // Process JOINs if present
1393        let final_table = if !statement.joins.is_empty() {
1394            plan.begin_step(
1395                StepType::Join,
1396                format!("Process {} JOINs", statement.joins.len()),
1397            );
1398            plan.set_rows_in(source_table.row_count());
1399
1400            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1401            let mut current_table = source_table;
1402
1403            for (idx, join_clause) in statement.joins.iter().enumerate() {
1404                let join_start = Instant::now();
1405                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1406                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1407                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1408                plan.add_detail(format!(
1409                    "Executing {:?} JOIN on {} condition(s)",
1410                    join_clause.join_type,
1411                    join_clause.condition.conditions.len()
1412                ));
1413
1414                // Resolve the right table for the join
1415                let right_table = match &join_clause.table {
1416                    TableSource::Table(name) => {
1417                        // Check if it's a CTE reference (case-insensitive fallback)
1418                        if let Some(cte_view) = resolve_cte(cte_context, name) {
1419                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1420
1421                            // Apply alias to qualified column names if present
1422                            if let Some(ref alias) = join_clause.alias {
1423                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1424                                for column in materialized.columns_mut() {
1425                                    // Replace the CTE name with the alias in qualified names
1426                                    if let Some(ref qualified_name) = column.qualified_name {
1427                                        if qualified_name.starts_with(&format!("{}.", name)) {
1428                                            column.qualified_name = Some(qualified_name.replace(
1429                                                &format!("{}.", name),
1430                                                &format!("{}.", alias),
1431                                            ));
1432                                        }
1433                                    }
1434                                    // Update source table to reflect the alias
1435                                    if column.source_table.as_ref() == Some(name) {
1436                                        column.source_table = Some(alias.clone());
1437                                    }
1438                                }
1439                            }
1440
1441                            Arc::new(materialized)
1442                        } else {
1443                            // For now, we need the actual table data
1444                            // In a real implementation, this would load from file
1445                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1446                        }
1447                    }
1448                    TableSource::DerivedTable { query, alias: _ } => {
1449                        // Execute the subquery
1450                        let subquery_result = self.build_view_with_context(
1451                            table.clone(),
1452                            *query.clone(),
1453                            cte_context,
1454                        )?;
1455                        let materialized = self.materialize_view(subquery_result)?;
1456                        Arc::new(materialized)
1457                    }
1458                    TableSource::Pivot { .. } => {
1459                        // PIVOT in JOIN is not supported yet (will be handled by transformer)
1460                        return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1461                    }
1462                };
1463
1464                // Execute the join
1465                let joined = join_executor.execute_join(
1466                    current_table.clone(),
1467                    join_clause,
1468                    right_table.clone(),
1469                )?;
1470
1471                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1472                plan.set_rows_out(joined.row_count());
1473                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1474                plan.add_detail(format!(
1475                    "Join time: {:.3}ms",
1476                    join_start.elapsed().as_secs_f64() * 1000.0
1477                ));
1478                plan.end_step();
1479
1480                current_table = Arc::new(joined);
1481            }
1482
1483            plan.set_rows_out(current_table.row_count());
1484            plan.add_detail(format!(
1485                "Final result after all joins: {} rows",
1486                current_table.row_count()
1487            ));
1488            plan.end_step();
1489            current_table
1490        } else {
1491            source_table
1492        };
1493
1494        // Continue with the existing build_view logic but using final_table
1495        self.build_view_internal_with_plan_and_exec(
1496            final_table,
1497            statement,
1498            plan,
1499            Some(exec_context),
1500        )
1501    }
1502
1503    /// Materialize a DataView into a new DataTable
1504    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1505        let source = view.source();
1506        let mut result_table = DataTable::new("derived");
1507
1508        // Get the visible columns from the view
1509        let visible_cols = view.visible_column_indices().to_vec();
1510
1511        // Copy column definitions
1512        for col_idx in &visible_cols {
1513            let col = &source.columns[*col_idx];
1514            let new_col = DataColumn {
1515                name: col.name.clone(),
1516                data_type: col.data_type.clone(),
1517                nullable: col.nullable,
1518                unique_values: col.unique_values,
1519                null_count: col.null_count,
1520                metadata: col.metadata.clone(),
1521                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1522                source_table: col.source_table.clone(),     // Preserve source table
1523            };
1524            result_table.add_column(new_col);
1525        }
1526
1527        // Copy visible rows
1528        for row_idx in view.visible_row_indices() {
1529            let source_row = &source.rows[*row_idx];
1530            let mut new_row = DataRow { values: Vec::new() };
1531
1532            for col_idx in &visible_cols {
1533                new_row.values.push(source_row.values[*col_idx].clone());
1534            }
1535
1536            result_table.add_row(new_row);
1537        }
1538
1539        Ok(result_table)
1540    }
1541
1542    fn build_view_internal(
1543        &self,
1544        table: Arc<DataTable>,
1545        statement: SelectStatement,
1546    ) -> Result<DataView> {
1547        let mut dummy_plan = ExecutionPlanBuilder::new();
1548        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1549    }
1550
1551    fn build_view_internal_with_plan(
1552        &self,
1553        table: Arc<DataTable>,
1554        statement: SelectStatement,
1555        plan: &mut ExecutionPlanBuilder,
1556    ) -> Result<DataView> {
1557        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1558    }
1559
1560    fn build_view_internal_with_plan_and_exec(
1561        &self,
1562        table: Arc<DataTable>,
1563        statement: SelectStatement,
1564        plan: &mut ExecutionPlanBuilder,
1565        exec_context: Option<&ExecutionContext>,
1566    ) -> Result<DataView> {
1567        debug!(
1568            "QueryEngine::build_view - select_items: {:?}",
1569            statement.select_items
1570        );
1571        debug!(
1572            "QueryEngine::build_view - where_clause: {:?}",
1573            statement.where_clause
1574        );
1575
1576        // Start with all rows visible
1577        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1578
1579        // Apply WHERE clause filtering using recursive evaluator
1580        if let Some(where_clause) = &statement.where_clause {
1581            let total_rows = table.row_count();
1582            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1583            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1584
1585            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1586            plan.set_rows_in(total_rows);
1587            plan.add_detail(format!("Input: {} rows", total_rows));
1588
1589            // Add details about WHERE conditions
1590            for condition in &where_clause.conditions {
1591                plan.add_detail(format!("Condition: {:?}", condition.expr));
1592            }
1593
1594            let filter_start = Instant::now();
1595            // Create an evaluation context for caching compiled regexes
1596            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1597
1598            // Create evaluator ONCE before the loop for performance
1599            let mut evaluator = if let Some(exec_ctx) = exec_context {
1600                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1601                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1602            } else {
1603                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1604            };
1605
1606            // Filter visible rows based on WHERE clause
1607            let mut filtered_rows = Vec::new();
1608            for row_idx in visible_rows {
1609                // Only log for first few rows to avoid performance impact
1610                if row_idx < 3 {
1611                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1612                }
1613
1614                match evaluator.evaluate(where_clause, row_idx) {
1615                    Ok(result) => {
1616                        if row_idx < 3 {
1617                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1618                        }
1619                        if result {
1620                            filtered_rows.push(row_idx);
1621                        }
1622                    }
1623                    Err(e) => {
1624                        if row_idx < 3 {
1625                            debug!(
1626                                "QueryEngine: WHERE evaluation error for row {}: {}",
1627                                row_idx, e
1628                            );
1629                        }
1630                        // Propagate WHERE clause errors instead of silently ignoring them
1631                        return Err(e);
1632                    }
1633                }
1634            }
1635
1636            // Log regex cache statistics
1637            let (compilations, cache_hits) = eval_context.get_stats();
1638            if compilations > 0 || cache_hits > 0 {
1639                debug!(
1640                    "LIKE pattern cache: {} compilations, {} cache hits",
1641                    compilations, cache_hits
1642                );
1643            }
1644            visible_rows = filtered_rows;
1645            let filter_duration = filter_start.elapsed();
1646            info!(
1647                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1648                total_rows,
1649                visible_rows.len(),
1650                filter_duration
1651            );
1652
1653            plan.set_rows_out(visible_rows.len());
1654            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1655            plan.add_detail(format!(
1656                "Filter time: {:.3}ms",
1657                filter_duration.as_secs_f64() * 1000.0
1658            ));
1659            plan.end_step();
1660        }
1661
1662        // Create initial DataView with filtered rows
1663        let mut view = DataView::new(table.clone());
1664        view = view.with_rows(visible_rows);
1665
1666        // Handle GROUP BY if present
1667        if let Some(group_by_exprs) = &statement.group_by {
1668            if !group_by_exprs.is_empty() {
1669                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1670
1671                plan.begin_step(
1672                    StepType::GroupBy,
1673                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1674                );
1675                plan.set_rows_in(view.row_count());
1676                plan.add_detail(format!("Input: {} rows", view.row_count()));
1677                for expr in group_by_exprs {
1678                    plan.add_detail(format!("Group by: {:?}", expr));
1679                }
1680
1681                let group_start = Instant::now();
1682                view = self.apply_group_by(
1683                    view,
1684                    group_by_exprs,
1685                    &statement.select_items,
1686                    statement.having.as_ref(),
1687                    plan,
1688                )?;
1689
1690                // Hide any columns that were promoted from HAVING (synthetic aggregates)
1691                // These have the __hidden_agg_ prefix and should not appear in output
1692                use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1693                let hidden_indices: Vec<usize> = view
1694                    .source()
1695                    .columns
1696                    .iter()
1697                    .enumerate()
1698                    .filter_map(|(i, c)| {
1699                        if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1700                            Some(i)
1701                        } else {
1702                            None
1703                        }
1704                    })
1705                    .collect();
1706                for &idx in hidden_indices.iter().rev() {
1707                    view.hide_column(idx);
1708                }
1709
1710                plan.set_rows_out(view.row_count());
1711                plan.add_detail(format!("Output: {} groups", view.row_count()));
1712                plan.add_detail(format!(
1713                    "Overall time: {:.3}ms",
1714                    group_start.elapsed().as_secs_f64() * 1000.0
1715                ));
1716                plan.end_step();
1717            }
1718        } else {
1719            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1720            if !statement.select_items.is_empty() {
1721                // Check if we have ANY non-star items (not just the first one)
1722                let has_non_star_items = statement
1723                    .select_items
1724                    .iter()
1725                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1726
1727                // Apply select items if:
1728                // 1. We have computed expressions or explicit columns
1729                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1730                if has_non_star_items || statement.select_items.len() > 1 {
1731                    view = self.apply_select_items(
1732                        view,
1733                        &statement.select_items,
1734                        &statement,
1735                        exec_context,
1736                        plan,
1737                    )?;
1738                }
1739                // If it's just a single star, no projection needed
1740            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1741                debug!("QueryEngine: Using legacy columns path");
1742                // Fallback to legacy column projection for backward compatibility
1743                // Use the current view's source table, not the original table
1744                let source_table = view.source();
1745                let column_indices =
1746                    self.resolve_column_indices(source_table, &statement.columns)?;
1747                view = view.with_columns(column_indices);
1748            }
1749        }
1750
1751        // Apply DISTINCT if specified
1752        if statement.distinct {
1753            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1754            plan.set_rows_in(view.row_count());
1755            plan.add_detail(format!("Input: {} rows", view.row_count()));
1756
1757            let distinct_start = Instant::now();
1758            view = self.apply_distinct(view)?;
1759
1760            plan.set_rows_out(view.row_count());
1761            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1762            plan.add_detail(format!(
1763                "Distinct time: {:.3}ms",
1764                distinct_start.elapsed().as_secs_f64() * 1000.0
1765            ));
1766            plan.end_step();
1767        }
1768
1769        // Apply ORDER BY sorting
1770        if let Some(order_by_columns) = &statement.order_by {
1771            if !order_by_columns.is_empty() {
1772                plan.begin_step(
1773                    StepType::Sort,
1774                    format!("ORDER BY {} columns", order_by_columns.len()),
1775                );
1776                plan.set_rows_in(view.row_count());
1777                for col in order_by_columns {
1778                    // Format the expression (simplified for now - just show column name or "expr")
1779                    let expr_str = match &col.expr {
1780                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1781                        _ => "expr".to_string(),
1782                    };
1783                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1784                }
1785
1786                let sort_start = Instant::now();
1787                view =
1788                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1789
1790                plan.add_detail(format!(
1791                    "Sort time: {:.3}ms",
1792                    sort_start.elapsed().as_secs_f64() * 1000.0
1793                ));
1794                plan.end_step();
1795            }
1796        }
1797
1798        // Apply LIMIT/OFFSET
1799        if let Some(limit) = statement.limit {
1800            let offset = statement.offset.unwrap_or(0);
1801            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1802            plan.set_rows_in(view.row_count());
1803            if offset > 0 {
1804                plan.add_detail(format!("OFFSET: {}", offset));
1805            }
1806            view = view.with_limit(limit, offset);
1807            plan.set_rows_out(view.row_count());
1808            plan.add_detail(format!("Output: {} rows", view.row_count()));
1809            plan.end_step();
1810        }
1811
1812        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1813        if !statement.set_operations.is_empty() {
1814            plan.begin_step(
1815                StepType::SetOperation,
1816                format!("Process {} set operations", statement.set_operations.len()),
1817            );
1818            plan.set_rows_in(view.row_count());
1819
1820            // Materialize the first result set
1821            let mut combined_table = self.materialize_view(view)?;
1822            let first_columns = combined_table.column_names();
1823            let first_column_count = first_columns.len();
1824
1825            // Track if any operation requires deduplication
1826            let mut needs_deduplication = false;
1827
1828            // Process each set operation
1829            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1830                let op_start = Instant::now();
1831                plan.begin_step(
1832                    StepType::SetOperation,
1833                    format!("{:?} operation #{}", operation, idx + 1),
1834                );
1835
1836                // Execute the next SELECT statement
1837                // We need to pass the original table and exec_context for proper resolution
1838                let next_view = if let Some(exec_ctx) = exec_context {
1839                    self.build_view_internal_with_plan_and_exec(
1840                        table.clone(),
1841                        *next_statement.clone(),
1842                        plan,
1843                        Some(exec_ctx),
1844                    )?
1845                } else {
1846                    self.build_view_internal_with_plan(
1847                        table.clone(),
1848                        *next_statement.clone(),
1849                        plan,
1850                    )?
1851                };
1852
1853                // Materialize the next result set
1854                let next_table = self.materialize_view(next_view)?;
1855                let next_columns = next_table.column_names();
1856                let next_column_count = next_columns.len();
1857
1858                // Validate schema compatibility
1859                if first_column_count != next_column_count {
1860                    return Err(anyhow!(
1861                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1862                        first_column_count,
1863                        idx + 2,
1864                        next_column_count
1865                    ));
1866                }
1867
1868                // Warn if column names don't match (but allow it - some SQL dialects do)
1869                for (col_idx, (first_col, next_col)) in
1870                    first_columns.iter().zip(next_columns.iter()).enumerate()
1871                {
1872                    if !first_col.eq_ignore_ascii_case(next_col) {
1873                        debug!(
1874                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1875                            col_idx + 1,
1876                            first_col,
1877                            next_col
1878                        );
1879                    }
1880                }
1881
1882                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1883                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1884
1885                // Perform the set operation
1886                match operation {
1887                    SetOperation::UnionAll => {
1888                        // UNION ALL: Simply concatenate all rows without deduplication
1889                        for row in next_table.rows.iter() {
1890                            combined_table.add_row(row.clone());
1891                        }
1892                        plan.add_detail(format!(
1893                            "Result: {} rows (no deduplication)",
1894                            combined_table.row_count()
1895                        ));
1896                    }
1897                    SetOperation::Union => {
1898                        // UNION: Concatenate all rows first, deduplicate at the end
1899                        for row in next_table.rows.iter() {
1900                            combined_table.add_row(row.clone());
1901                        }
1902                        needs_deduplication = true;
1903                        plan.add_detail(format!(
1904                            "Combined: {} rows (deduplication pending)",
1905                            combined_table.row_count()
1906                        ));
1907                    }
1908                    SetOperation::Intersect => {
1909                        // INTERSECT: Keep only rows that appear in both
1910                        // TODO: Implement intersection logic
1911                        return Err(anyhow!("INTERSECT is not yet implemented"));
1912                    }
1913                    SetOperation::Except => {
1914                        // EXCEPT: Keep only rows from left that don't appear in right
1915                        // TODO: Implement except logic
1916                        return Err(anyhow!("EXCEPT is not yet implemented"));
1917                    }
1918                }
1919
1920                plan.add_detail(format!(
1921                    "Operation time: {:.3}ms",
1922                    op_start.elapsed().as_secs_f64() * 1000.0
1923                ));
1924                plan.set_rows_out(combined_table.row_count());
1925                plan.end_step();
1926            }
1927
1928            plan.set_rows_out(combined_table.row_count());
1929            plan.add_detail(format!(
1930                "Combined result: {} rows after {} operations",
1931                combined_table.row_count(),
1932                statement.set_operations.len()
1933            ));
1934            plan.end_step();
1935
1936            // Create a new view from the combined table
1937            view = DataView::new(Arc::new(combined_table));
1938
1939            // Apply deduplication if any UNION (not UNION ALL) operation was used
1940            if needs_deduplication {
1941                plan.begin_step(
1942                    StepType::Distinct,
1943                    "UNION deduplication - remove duplicate rows".to_string(),
1944                );
1945                plan.set_rows_in(view.row_count());
1946                plan.add_detail(format!("Input: {} rows", view.row_count()));
1947
1948                let distinct_start = Instant::now();
1949                view = self.apply_distinct(view)?;
1950
1951                plan.set_rows_out(view.row_count());
1952                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1953                plan.add_detail(format!(
1954                    "Deduplication time: {:.3}ms",
1955                    distinct_start.elapsed().as_secs_f64() * 1000.0
1956                ));
1957                plan.end_step();
1958            }
1959        }
1960
1961        Ok(view)
1962    }
1963
1964    /// Resolve column names to indices
1965    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1966        let mut indices = Vec::new();
1967        let table_columns = table.column_names();
1968
1969        for col_name in columns {
1970            let index = table_columns
1971                .iter()
1972                .position(|c| c.eq_ignore_ascii_case(col_name))
1973                .ok_or_else(|| {
1974                    let suggestion = self.find_similar_column(table, col_name);
1975                    match suggestion {
1976                        Some(similar) => anyhow::anyhow!(
1977                            "Column '{}' not found. Did you mean '{}'?",
1978                            col_name,
1979                            similar
1980                        ),
1981                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1982                    }
1983                })?;
1984            indices.push(index);
1985        }
1986
1987        Ok(indices)
1988    }
1989
1990    /// Apply SELECT items (columns and computed expressions) to create new view
1991    fn apply_select_items(
1992        &self,
1993        view: DataView,
1994        select_items: &[SelectItem],
1995        _statement: &SelectStatement,
1996        exec_context: Option<&ExecutionContext>,
1997        plan: &mut ExecutionPlanBuilder,
1998    ) -> Result<DataView> {
1999        debug!(
2000            "QueryEngine::apply_select_items - items: {:?}",
2001            select_items
2002        );
2003        debug!(
2004            "QueryEngine::apply_select_items - input view has {} rows",
2005            view.row_count()
2006        );
2007
2008        // Check if any select items contain window functions
2009        let has_window_functions = select_items.iter().any(|item| match item {
2010            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2011            _ => false,
2012        });
2013
2014        // Count window functions for detailed reporting
2015        let window_func_count: usize = select_items
2016            .iter()
2017            .filter(|item| match item {
2018                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2019                _ => false,
2020            })
2021            .count();
2022
2023        // Start timing for window function evaluation if present
2024        let window_start = if has_window_functions {
2025            debug!(
2026                "QueryEngine::apply_select_items - detected {} window functions",
2027                window_func_count
2028            );
2029
2030            // Extract window specs (Step 2: parallel path, not used yet)
2031            let window_specs = Self::extract_window_specs(select_items);
2032            debug!("Extracted {} window function specs", window_specs.len());
2033
2034            Some(Instant::now())
2035        } else {
2036            None
2037        };
2038
2039        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
2040        let has_unnest = select_items.iter().any(|item| match item {
2041            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2042            _ => false,
2043        });
2044
2045        if has_unnest {
2046            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2047            return self.apply_select_with_row_expansion(view, select_items);
2048        }
2049
2050        // Check if this is an aggregate query:
2051        // 1. At least one aggregate function exists
2052        // 2. All other items are either aggregates or constants (aggregate-compatible)
2053        let has_aggregates = select_items.iter().any(|item| match item {
2054            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2055            SelectItem::Column { .. } => false,
2056            SelectItem::Star { .. } => false,
2057            SelectItem::StarExclude { .. } => false,
2058        });
2059
2060        let all_aggregate_compatible = select_items.iter().all(|item| match item {
2061            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2062            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
2063            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
2064            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
2065        });
2066
2067        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2068            // Special handling for aggregate queries with constants (no GROUP BY)
2069            // These should produce exactly one row
2070            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2071            return self.apply_aggregate_select(view, select_items);
2072        }
2073
2074        // Check if we need to create computed columns
2075        let has_computed_expressions = select_items
2076            .iter()
2077            .any(|item| matches!(item, SelectItem::Expression { .. }));
2078
2079        debug!(
2080            "QueryEngine::apply_select_items - has_computed_expressions: {}",
2081            has_computed_expressions
2082        );
2083
2084        if !has_computed_expressions {
2085            // Simple case: only columns, use existing projection logic
2086            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2087            return Ok(view.with_columns(column_indices));
2088        }
2089
2090        // Complex case: we have computed expressions
2091        // IMPORTANT: We create a PROJECTED view, not a new table
2092        // This preserves the original DataTable reference
2093
2094        let source_table = view.source();
2095        let visible_rows = view.visible_row_indices();
2096
2097        // Create a temporary table just for the computed result view
2098        // But this table is only used for the current query result
2099        let mut computed_table = DataTable::new("query_result");
2100
2101        // First, expand any Star selectors to actual columns
2102        let mut expanded_items = Vec::new();
2103        for item in select_items {
2104            match item {
2105                SelectItem::Star { table_prefix, .. } => {
2106                    if let Some(prefix) = table_prefix {
2107                        // Scoped expansion: table.* expands only columns from that table
2108                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2109                        for col in &source_table.columns {
2110                            if Self::column_matches_table(col, prefix) {
2111                                expanded_items.push(SelectItem::Column {
2112                                    column: ColumnRef::unquoted(col.name.clone()),
2113                                    leading_comments: vec![],
2114                                    trailing_comment: None,
2115                                });
2116                            }
2117                        }
2118                    } else {
2119                        // Unscoped expansion: * expands to all columns
2120                        debug!("QueryEngine::apply_select_items - expanding *");
2121                        for col_name in source_table.column_names() {
2122                            expanded_items.push(SelectItem::Column {
2123                                column: ColumnRef::unquoted(col_name.to_string()),
2124                                leading_comments: vec![],
2125                                trailing_comment: None,
2126                            });
2127                        }
2128                    }
2129                }
2130                _ => expanded_items.push(item.clone()),
2131            }
2132        }
2133
2134        // Add columns based on expanded SelectItems, handling duplicates
2135        let mut column_name_counts: std::collections::HashMap<String, usize> =
2136            std::collections::HashMap::new();
2137
2138        for item in &expanded_items {
2139            let base_name = match item {
2140                SelectItem::Column {
2141                    column: col_ref, ..
2142                } => col_ref.name.clone(),
2143                SelectItem::Expression { alias, .. } => alias.clone(),
2144                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2145                SelectItem::StarExclude { .. } => {
2146                    unreachable!("StarExclude should have been expanded")
2147                }
2148            };
2149
2150            // Check if this column name has been used before
2151            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2152            let column_name = if *count == 0 {
2153                // First occurrence, use the name as-is
2154                base_name.clone()
2155            } else {
2156                // Duplicate, append a suffix
2157                format!("{base_name}_{count}")
2158            };
2159            *count += 1;
2160
2161            computed_table.add_column(DataColumn::new(&column_name));
2162        }
2163
2164        // Check if batch evaluation can be used
2165        // Batch evaluation is the default but we need to check if all window functions
2166        // are standalone (not embedded in expressions)
2167        let can_use_batch = expanded_items.iter().all(|item| {
2168            match item {
2169                SelectItem::Expression { expr, .. } => {
2170                    // Only use batch evaluation if the expression IS a window function,
2171                    // not if it CONTAINS a window function
2172                    matches!(expr, SqlExpression::WindowFunction { .. })
2173                        || !Self::contains_window_function(expr)
2174                }
2175                _ => true, // Non-expressions are fine
2176            }
2177        });
2178
2179        // Batch evaluation is now the default mode for improved performance
2180        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
2181        let use_batch_evaluation = can_use_batch
2182            && std::env::var("SQL_CLI_BATCH_WINDOW")
2183                .map(|v| v != "0" && v.to_lowercase() != "false")
2184                .unwrap_or(true);
2185
2186        // Store window specs for batch evaluation if needed
2187        let batch_window_specs = if use_batch_evaluation && has_window_functions {
2188            debug!("BATCH window function evaluation flag is enabled");
2189            // Extract window specs before timing starts
2190            let specs = Self::extract_window_specs(&expanded_items);
2191            debug!(
2192                "Extracted {} window function specs for batch evaluation",
2193                specs.len()
2194            );
2195            Some(specs)
2196        } else {
2197            None
2198        };
2199
2200        // Calculate values for each row
2201        let mut evaluator =
2202            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2203
2204        // Populate table aliases from exec_context if available
2205        if let Some(exec_ctx) = exec_context {
2206            let aliases = exec_ctx.get_aliases();
2207            if !aliases.is_empty() {
2208                debug!(
2209                    "Applying {} aliases to evaluator: {:?}",
2210                    aliases.len(),
2211                    aliases
2212                );
2213                evaluator = evaluator.with_table_aliases(aliases);
2214            }
2215        }
2216
2217        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2218        // This avoids 50,000+ redundant context lookups
2219        if has_window_functions {
2220            let preload_start = Instant::now();
2221
2222            // Extract all unique WindowSpecs from SELECT items
2223            let mut window_specs = Vec::new();
2224            for item in &expanded_items {
2225                if let SelectItem::Expression { expr, .. } = item {
2226                    Self::collect_window_specs(expr, &mut window_specs);
2227                }
2228            }
2229
2230            // Pre-create all WindowContexts
2231            for spec in &window_specs {
2232                let _ = evaluator.get_or_create_window_context(spec);
2233            }
2234
2235            debug!(
2236                "Pre-created {} WindowContext(s) in {:.2}ms",
2237                window_specs.len(),
2238                preload_start.elapsed().as_secs_f64() * 1000.0
2239            );
2240        }
2241
2242        // Batch evaluation path for window functions
2243        if let Some(window_specs) = batch_window_specs {
2244            debug!("Starting batch window function evaluation");
2245            let batch_start = Instant::now();
2246
2247            // Initialize result table with all rows
2248            let mut batch_results: Vec<Vec<DataValue>> =
2249                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2250
2251            // Use the window specs we extracted earlier
2252            let detailed_window_specs = &window_specs;
2253
2254            // Group window specs by their WindowSpec for batch processing
2255            let mut specs_by_window: HashMap<
2256                u64,
2257                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2258            > = HashMap::new();
2259            for spec in detailed_window_specs {
2260                let hash = spec.spec.compute_hash();
2261                specs_by_window
2262                    .entry(hash)
2263                    .or_insert_with(Vec::new)
2264                    .push(spec);
2265            }
2266
2267            // Process each unique window specification
2268            for (_window_hash, specs) in specs_by_window {
2269                // Get the window context (already pre-created)
2270                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2271
2272                // Process each function using this window
2273                for spec in specs {
2274                    match spec.function_name.as_str() {
2275                        "LAG" => {
2276                            // Extract column and offset from arguments
2277                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2278                                let column_name = col_ref.name.as_str();
2279                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2280                                    spec.args.get(1)
2281                                {
2282                                    n.parse::<i64>().unwrap_or(1)
2283                                } else {
2284                                    1 // default offset
2285                                };
2286
2287                                let values = context.evaluate_lag_batch(
2288                                    visible_rows,
2289                                    column_name,
2290                                    offset,
2291                                )?;
2292
2293                                // Write results to the output column
2294                                for (row_idx, value) in values.into_iter().enumerate() {
2295                                    batch_results[row_idx][spec.output_column_index] = value;
2296                                }
2297                            }
2298                        }
2299                        "LEAD" => {
2300                            // Extract column and offset from arguments
2301                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2302                                let column_name = col_ref.name.as_str();
2303                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2304                                    spec.args.get(1)
2305                                {
2306                                    n.parse::<i64>().unwrap_or(1)
2307                                } else {
2308                                    1 // default offset
2309                                };
2310
2311                                let values = context.evaluate_lead_batch(
2312                                    visible_rows,
2313                                    column_name,
2314                                    offset,
2315                                )?;
2316
2317                                // Write results to the output column
2318                                for (row_idx, value) in values.into_iter().enumerate() {
2319                                    batch_results[row_idx][spec.output_column_index] = value;
2320                                }
2321                            }
2322                        }
2323                        "ROW_NUMBER" => {
2324                            let values = context.evaluate_row_number_batch(visible_rows)?;
2325
2326                            // Write results to the output column
2327                            for (row_idx, value) in values.into_iter().enumerate() {
2328                                batch_results[row_idx][spec.output_column_index] = value;
2329                            }
2330                        }
2331                        "RANK" => {
2332                            let values = context.evaluate_rank_batch(visible_rows)?;
2333
2334                            // Write results to the output column
2335                            for (row_idx, value) in values.into_iter().enumerate() {
2336                                batch_results[row_idx][spec.output_column_index] = value;
2337                            }
2338                        }
2339                        "DENSE_RANK" => {
2340                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2341
2342                            // Write results to the output column
2343                            for (row_idx, value) in values.into_iter().enumerate() {
2344                                batch_results[row_idx][spec.output_column_index] = value;
2345                            }
2346                        }
2347                        "SUM" => {
2348                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2349                                let column_name = col_ref.name.as_str();
2350                                let values =
2351                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2352
2353                                for (row_idx, value) in values.into_iter().enumerate() {
2354                                    batch_results[row_idx][spec.output_column_index] = value;
2355                                }
2356                            }
2357                        }
2358                        "AVG" => {
2359                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2360                                let column_name = col_ref.name.as_str();
2361                                let values =
2362                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2363
2364                                for (row_idx, value) in values.into_iter().enumerate() {
2365                                    batch_results[row_idx][spec.output_column_index] = value;
2366                                }
2367                            }
2368                        }
2369                        "MIN" => {
2370                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2371                                let column_name = col_ref.name.as_str();
2372                                let values =
2373                                    context.evaluate_min_batch(visible_rows, column_name)?;
2374
2375                                for (row_idx, value) in values.into_iter().enumerate() {
2376                                    batch_results[row_idx][spec.output_column_index] = value;
2377                                }
2378                            }
2379                        }
2380                        "MAX" => {
2381                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2382                                let column_name = col_ref.name.as_str();
2383                                let values =
2384                                    context.evaluate_max_batch(visible_rows, column_name)?;
2385
2386                                for (row_idx, value) in values.into_iter().enumerate() {
2387                                    batch_results[row_idx][spec.output_column_index] = value;
2388                                }
2389                            }
2390                        }
2391                        "COUNT" => {
2392                            // COUNT can be COUNT(*) or COUNT(column)
2393                            let column_name = match spec.args.get(0) {
2394                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2395                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2396                                _ => None,
2397                            };
2398
2399                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2400
2401                            for (row_idx, value) in values.into_iter().enumerate() {
2402                                batch_results[row_idx][spec.output_column_index] = value;
2403                            }
2404                        }
2405                        "FIRST_VALUE" => {
2406                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2407                                let column_name = col_ref.name.as_str();
2408                                let values = context
2409                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2410
2411                                for (row_idx, value) in values.into_iter().enumerate() {
2412                                    batch_results[row_idx][spec.output_column_index] = value;
2413                                }
2414                            }
2415                        }
2416                        "LAST_VALUE" => {
2417                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2418                                let column_name = col_ref.name.as_str();
2419                                let values =
2420                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2421
2422                                for (row_idx, value) in values.into_iter().enumerate() {
2423                                    batch_results[row_idx][spec.output_column_index] = value;
2424                                }
2425                            }
2426                        }
2427                        _ => {
2428                            // Fall back to per-row evaluation for unsupported functions
2429                            debug!(
2430                                "Window function {} not supported in batch mode, using per-row",
2431                                spec.function_name
2432                            );
2433                        }
2434                    }
2435                }
2436            }
2437
2438            // Now evaluate non-window columns
2439            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2440                for (col_idx, item) in expanded_items.iter().enumerate() {
2441                    // Skip if this column was already filled by a window function
2442                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2443                        continue;
2444                    }
2445
2446                    let value = match item {
2447                        SelectItem::Column {
2448                            column: col_ref, ..
2449                        } => {
2450                            match evaluator
2451                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2452                            {
2453                                Ok(val) => val,
2454                                Err(e) => {
2455                                    return Err(anyhow!(
2456                                        "Failed to evaluate column {}: {}",
2457                                        col_ref.to_sql(),
2458                                        e
2459                                    ));
2460                                }
2461                            }
2462                        }
2463                        SelectItem::Expression { expr, .. } => {
2464                            // For batch evaluation, we need to handle expressions differently
2465                            // If this is just a window function by itself, we already computed it
2466                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2467                                // Pure window function - already handled in batch evaluation
2468                                continue;
2469                            }
2470                            // For expressions containing window functions or regular expressions,
2471                            // evaluate them normally
2472                            evaluator.evaluate(&expr, source_row_idx)?
2473                        }
2474                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2475                        SelectItem::StarExclude { .. } => {
2476                            unreachable!("StarExclude should have been expanded")
2477                        }
2478                    };
2479                    batch_results[result_row_idx][col_idx] = value;
2480                }
2481            }
2482
2483            // Add all rows to the table
2484            for row_values in batch_results {
2485                computed_table
2486                    .add_row(DataRow::new(row_values))
2487                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2488            }
2489
2490            debug!(
2491                "Batch window evaluation completed in {:.3}ms",
2492                batch_start.elapsed().as_secs_f64() * 1000.0
2493            );
2494        } else {
2495            // Original per-row evaluation path
2496            for &row_idx in visible_rows {
2497                let mut row_values = Vec::new();
2498
2499                for item in &expanded_items {
2500                    let value = match item {
2501                        SelectItem::Column {
2502                            column: col_ref, ..
2503                        } => {
2504                            // Use evaluator for column resolution (handles aliases properly)
2505                            match evaluator
2506                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2507                            {
2508                                Ok(val) => val,
2509                                Err(e) => {
2510                                    return Err(anyhow!(
2511                                        "Failed to evaluate column {}: {}",
2512                                        col_ref.to_sql(),
2513                                        e
2514                                    ));
2515                                }
2516                            }
2517                        }
2518                        SelectItem::Expression { expr, .. } => {
2519                            // Computed expression
2520                            evaluator.evaluate(&expr, row_idx)?
2521                        }
2522                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2523                        SelectItem::StarExclude { .. } => {
2524                            unreachable!("StarExclude should have been expanded")
2525                        }
2526                    };
2527                    row_values.push(value);
2528                }
2529
2530                computed_table
2531                    .add_row(DataRow::new(row_values))
2532                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2533            }
2534        }
2535
2536        // Log window function timing if applicable
2537        if let Some(start) = window_start {
2538            let window_duration = start.elapsed();
2539            info!(
2540                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2541                window_duration.as_secs_f64() * 1000.0,
2542                visible_rows.len(),
2543                window_func_count
2544            );
2545
2546            // Add to execution plan
2547            plan.begin_step(
2548                StepType::WindowFunction,
2549                format!("Evaluate {} window function(s)", window_func_count),
2550            );
2551            plan.set_rows_in(visible_rows.len());
2552            plan.set_rows_out(visible_rows.len());
2553            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2554            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2555            plan.add_detail(format!(
2556                "Evaluation time: {:.3}ms",
2557                window_duration.as_secs_f64() * 1000.0
2558            ));
2559            plan.end_step();
2560        }
2561
2562        // Return a view of the computed result
2563        // This is a temporary view for this query only
2564        Ok(DataView::new(Arc::new(computed_table)))
2565    }
2566
2567    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2568    fn apply_select_with_row_expansion(
2569        &self,
2570        view: DataView,
2571        select_items: &[SelectItem],
2572    ) -> Result<DataView> {
2573        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2574
2575        let source_table = view.source();
2576        let visible_rows = view.visible_row_indices();
2577        let expander_registry = RowExpanderRegistry::new();
2578
2579        // Create result table
2580        let mut result_table = DataTable::new("unnest_result");
2581
2582        // Expand * to columns and set up result columns
2583        let mut expanded_items = Vec::new();
2584        for item in select_items {
2585            match item {
2586                SelectItem::Star { table_prefix, .. } => {
2587                    if let Some(prefix) = table_prefix {
2588                        // Scoped expansion: table.* expands only columns from that table
2589                        debug!(
2590                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2591                            prefix
2592                        );
2593                        for col in &source_table.columns {
2594                            if Self::column_matches_table(col, prefix) {
2595                                expanded_items.push(SelectItem::Column {
2596                                    column: ColumnRef::unquoted(col.name.clone()),
2597                                    leading_comments: vec![],
2598                                    trailing_comment: None,
2599                                });
2600                            }
2601                        }
2602                    } else {
2603                        // Unscoped expansion: * expands to all columns
2604                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2605                        for col_name in source_table.column_names() {
2606                            expanded_items.push(SelectItem::Column {
2607                                column: ColumnRef::unquoted(col_name.to_string()),
2608                                leading_comments: vec![],
2609                                trailing_comment: None,
2610                            });
2611                        }
2612                    }
2613                }
2614                _ => expanded_items.push(item.clone()),
2615            }
2616        }
2617
2618        // Add columns to result table
2619        for item in &expanded_items {
2620            let column_name = match item {
2621                SelectItem::Column {
2622                    column: col_ref, ..
2623                } => col_ref.name.clone(),
2624                SelectItem::Expression { alias, .. } => alias.clone(),
2625                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2626                SelectItem::StarExclude { .. } => {
2627                    unreachable!("StarExclude should have been expanded")
2628                }
2629            };
2630            result_table.add_column(DataColumn::new(&column_name));
2631        }
2632
2633        // Process each input row
2634        let mut evaluator =
2635            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2636
2637        for &row_idx in visible_rows {
2638            // First pass: identify UNNEST expressions and collect their expansion arrays
2639            let mut unnest_expansions = Vec::new();
2640            let mut unnest_indices = Vec::new();
2641
2642            for (col_idx, item) in expanded_items.iter().enumerate() {
2643                if let SelectItem::Expression { expr, .. } = item {
2644                    if let Some(expansion_result) = self.try_expand_unnest(
2645                        &expr,
2646                        source_table,
2647                        row_idx,
2648                        &mut evaluator,
2649                        &expander_registry,
2650                    )? {
2651                        unnest_expansions.push(expansion_result);
2652                        unnest_indices.push(col_idx);
2653                    }
2654                }
2655            }
2656
2657            // Determine how many output rows to generate
2658            let expansion_count = if unnest_expansions.is_empty() {
2659                1 // No UNNEST, just one row
2660            } else {
2661                unnest_expansions
2662                    .iter()
2663                    .map(|exp| exp.row_count())
2664                    .max()
2665                    .unwrap_or(1)
2666            };
2667
2668            // Generate output rows
2669            for output_idx in 0..expansion_count {
2670                let mut row_values = Vec::new();
2671
2672                for (col_idx, item) in expanded_items.iter().enumerate() {
2673                    // Check if this column is an UNNEST column
2674                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2675
2676                    let value = if let Some(unnest_idx) = unnest_position {
2677                        // Get value from expansion array (or NULL if exhausted)
2678                        let expansion = &unnest_expansions[unnest_idx];
2679                        expansion
2680                            .values
2681                            .get(output_idx)
2682                            .cloned()
2683                            .unwrap_or(DataValue::Null)
2684                    } else {
2685                        // Regular column or non-UNNEST expression - replicate from input
2686                        match item {
2687                            SelectItem::Column {
2688                                column: col_ref, ..
2689                            } => {
2690                                let col_idx =
2691                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2692                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2693                                    )?;
2694                                let row = source_table
2695                                    .get_row(row_idx)
2696                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2697                                row.get(col_idx)
2698                                    .ok_or_else(|| {
2699                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2700                                    })?
2701                                    .clone()
2702                            }
2703                            SelectItem::Expression { expr, .. } => {
2704                                // Non-UNNEST expression - evaluate once and replicate
2705                                evaluator.evaluate(&expr, row_idx)?
2706                            }
2707                            SelectItem::Star { .. } => unreachable!(),
2708                            SelectItem::StarExclude { .. } => {
2709                                unreachable!("StarExclude should have been expanded")
2710                            }
2711                        }
2712                    };
2713
2714                    row_values.push(value);
2715                }
2716
2717                result_table
2718                    .add_row(DataRow::new(row_values))
2719                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2720            }
2721        }
2722
2723        debug!(
2724            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2725            visible_rows.len(),
2726            result_table.row_count()
2727        );
2728
2729        Ok(DataView::new(Arc::new(result_table)))
2730    }
2731
2732    /// Try to expand an expression if it's an UNNEST call
2733    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2734    fn try_expand_unnest(
2735        &self,
2736        expr: &SqlExpression,
2737        _source_table: &DataTable,
2738        row_idx: usize,
2739        evaluator: &mut ArithmeticEvaluator,
2740        expander_registry: &RowExpanderRegistry,
2741    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2742        // Check for UNNEST variant (direct syntax)
2743        if let SqlExpression::Unnest { column, delimiter } = expr {
2744            // Evaluate the column expression
2745            let column_value = evaluator.evaluate(column, row_idx)?;
2746
2747            // Delimiter is already a string literal
2748            let delimiter_value = DataValue::String(delimiter.clone());
2749
2750            // Get the UNNEST expander
2751            let expander = expander_registry
2752                .get("UNNEST")
2753                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2754
2755            // Expand the value
2756            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2757            return Ok(Some(expansion));
2758        }
2759
2760        // Also check for FunctionCall form (for compatibility)
2761        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2762            if name.to_uppercase() == "UNNEST" {
2763                // UNNEST(column, delimiter)
2764                if args.len() != 2 {
2765                    return Err(anyhow::anyhow!(
2766                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2767                    ));
2768                }
2769
2770                // Evaluate the column expression (first arg)
2771                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2772
2773                // Evaluate the delimiter expression (second arg)
2774                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2775
2776                // Get the UNNEST expander
2777                let expander = expander_registry
2778                    .get("UNNEST")
2779                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2780
2781                // Expand the value
2782                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2783                return Ok(Some(expansion));
2784            }
2785        }
2786
2787        Ok(None)
2788    }
2789
2790    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2791    fn apply_aggregate_select(
2792        &self,
2793        view: DataView,
2794        select_items: &[SelectItem],
2795    ) -> Result<DataView> {
2796        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2797
2798        let source_table = view.source();
2799        let mut result_table = DataTable::new("aggregate_result");
2800
2801        // Add columns for each select item
2802        for item in select_items {
2803            let column_name = match item {
2804                SelectItem::Expression { alias, .. } => alias.clone(),
2805                _ => unreachable!("Should only have expressions in aggregate-only query"),
2806            };
2807            result_table.add_column(DataColumn::new(&column_name));
2808        }
2809
2810        // Create evaluator with visible rows from the view (for filtered aggregates)
2811        let visible_rows = view.visible_row_indices().to_vec();
2812        let mut evaluator =
2813            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2814                .with_visible_rows(visible_rows);
2815
2816        // Evaluate each aggregate expression once (they handle all rows internally)
2817        let mut row_values = Vec::new();
2818        for item in select_items {
2819            match item {
2820                SelectItem::Expression { expr, .. } => {
2821                    // The evaluator will handle aggregates over all rows
2822                    // We pass row_index=0 but aggregates ignore it and process all rows
2823                    let value = evaluator.evaluate(expr, 0)?;
2824                    row_values.push(value);
2825                }
2826                _ => unreachable!("Should only have expressions in aggregate-only query"),
2827            }
2828        }
2829
2830        // Add the single result row
2831        result_table
2832            .add_row(DataRow::new(row_values))
2833            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2834
2835        Ok(DataView::new(Arc::new(result_table)))
2836    }
2837
2838    /// Check if a column belongs to a specific table based on source_table or qualified_name
2839    ///
2840    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2841    /// to filter which columns should be included.
2842    ///
2843    /// # Arguments
2844    /// * `col` - The column to check
2845    /// * `table_name` - The table name or alias to match against
2846    ///
2847    /// # Returns
2848    /// `true` if the column belongs to the specified table
2849    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2850        // First, check the source_table field
2851        if let Some(ref source) = col.source_table {
2852            // Direct match or matches with schema qualification
2853            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2854                return true;
2855            }
2856        }
2857
2858        // Second, check the qualified_name field
2859        if let Some(ref qualified) = col.qualified_name {
2860            // Check if qualified name starts with "table_name."
2861            if qualified.starts_with(&format!("{}.", table_name)) {
2862                return true;
2863            }
2864        }
2865
2866        false
2867    }
2868
2869    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2870    fn resolve_select_columns(
2871        &self,
2872        table: &DataTable,
2873        select_items: &[SelectItem],
2874    ) -> Result<Vec<usize>> {
2875        let mut indices = Vec::new();
2876        let table_columns = table.column_names();
2877
2878        for item in select_items {
2879            match item {
2880                SelectItem::Column {
2881                    column: col_ref, ..
2882                } => {
2883                    // Check if this has a table prefix
2884                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2885                        // For qualified references, ONLY try qualified lookup - no fallback
2886                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2887                        table.find_column_by_qualified_name(&qualified_name)
2888                            .ok_or_else(|| {
2889                                // Check if any columns have qualified names for better error message
2890                                let has_qualified = table.columns.iter()
2891                                    .any(|c| c.qualified_name.is_some());
2892                                if !has_qualified {
2893                                    anyhow::anyhow!(
2894                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2895                                        qualified_name, table_prefix
2896                                    )
2897                                } else {
2898                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2899                                }
2900                            })?
2901                    } else {
2902                        // Simple column name lookup
2903                        table_columns
2904                            .iter()
2905                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2906                            .ok_or_else(|| {
2907                                let suggestion = self.find_similar_column(table, &col_ref.name);
2908                                match suggestion {
2909                                    Some(similar) => anyhow::anyhow!(
2910                                        "Column '{}' not found. Did you mean '{}'?",
2911                                        col_ref.name,
2912                                        similar
2913                                    ),
2914                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2915                                }
2916                            })?
2917                    };
2918                    indices.push(index);
2919                }
2920                SelectItem::Star { table_prefix, .. } => {
2921                    if let Some(prefix) = table_prefix {
2922                        // Scoped expansion: table.* expands only columns from that table
2923                        for (i, col) in table.columns.iter().enumerate() {
2924                            if Self::column_matches_table(col, prefix) {
2925                                indices.push(i);
2926                            }
2927                        }
2928                    } else {
2929                        // Unscoped expansion: * expands to all column indices
2930                        for i in 0..table_columns.len() {
2931                            indices.push(i);
2932                        }
2933                    }
2934                }
2935                SelectItem::StarExclude {
2936                    table_prefix,
2937                    excluded_columns,
2938                    ..
2939                } => {
2940                    // Expand all columns (with optional table prefix), then exclude specified ones
2941                    if let Some(prefix) = table_prefix {
2942                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2943                        for (i, col) in table.columns.iter().enumerate() {
2944                            if Self::column_matches_table(col, prefix)
2945                                && !excluded_columns.contains(&col.name)
2946                            {
2947                                indices.push(i);
2948                            }
2949                        }
2950                    } else {
2951                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2952                        for (i, col_name) in table_columns.iter().enumerate() {
2953                            if !excluded_columns
2954                                .iter()
2955                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2956                            {
2957                                indices.push(i);
2958                            }
2959                        }
2960                    }
2961                }
2962                SelectItem::Expression { .. } => {
2963                    return Err(anyhow::anyhow!(
2964                        "Computed expressions require new table creation"
2965                    ));
2966                }
2967            }
2968        }
2969
2970        Ok(indices)
2971    }
2972
2973    /// Apply DISTINCT to remove duplicate rows
2974    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2975        use std::collections::HashSet;
2976
2977        let source = view.source();
2978        let visible_cols = view.visible_column_indices();
2979        let visible_rows = view.visible_row_indices();
2980
2981        // Build a set to track unique rows
2982        let mut seen_rows = HashSet::new();
2983        let mut unique_row_indices = Vec::new();
2984
2985        for &row_idx in visible_rows {
2986            // Build a key representing this row's visible column values
2987            let mut row_key = Vec::new();
2988            for &col_idx in visible_cols {
2989                let value = source
2990                    .get_value(row_idx, col_idx)
2991                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2992                // Convert value to a hashable representation
2993                row_key.push(format!("{:?}", value));
2994            }
2995
2996            // Check if we've seen this row before
2997            if seen_rows.insert(row_key) {
2998                // First time seeing this row combination
2999                unique_row_indices.push(row_idx);
3000            }
3001        }
3002
3003        // Create a new view with only unique rows
3004        Ok(view.with_rows(unique_row_indices))
3005    }
3006
3007    /// Apply multi-column ORDER BY sorting to the view
3008    fn apply_multi_order_by(
3009        &self,
3010        view: DataView,
3011        order_by_columns: &[OrderByItem],
3012    ) -> Result<DataView> {
3013        self.apply_multi_order_by_with_context(view, order_by_columns, None)
3014    }
3015
3016    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
3017    fn apply_multi_order_by_with_context(
3018        &self,
3019        mut view: DataView,
3020        order_by_columns: &[OrderByItem],
3021        _exec_context: Option<&ExecutionContext>,
3022    ) -> Result<DataView> {
3023        // Build list of (source_column_index, ascending) tuples
3024        let mut sort_columns = Vec::new();
3025
3026        for order_col in order_by_columns {
3027            // Extract column name from expression (currently only supports simple columns)
3028            let column_name = match &order_col.expr {
3029                SqlExpression::Column(col_ref) => col_ref.name.clone(),
3030                _ => {
3031                    // TODO: Support expression evaluation in ORDER BY
3032                    return Err(anyhow!(
3033                        "ORDER BY expressions not yet supported - only simple columns allowed"
3034                    ));
3035                }
3036            };
3037
3038            // Try to find the column index, handling qualified column names (table.column)
3039            let col_index = if column_name.contains('.') {
3040                // Qualified column name - extract unqualified part
3041                if let Some(dot_pos) = column_name.rfind('.') {
3042                    let col_name = &column_name[dot_pos + 1..];
3043
3044                    // After SELECT processing, columns are unqualified
3045                    // So just use the column name part
3046                    debug!(
3047                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
3048                        col_name, column_name
3049                    );
3050                    view.source().get_column_index(col_name)
3051                } else {
3052                    view.source().get_column_index(&column_name)
3053                }
3054            } else {
3055                // Simple column name
3056                view.source().get_column_index(&column_name)
3057            }
3058            .ok_or_else(|| {
3059                // If not found, provide helpful error with suggestions
3060                let suggestion = self.find_similar_column(view.source(), &column_name);
3061                match suggestion {
3062                    Some(similar) => anyhow::anyhow!(
3063                        "Column '{}' not found. Did you mean '{}'?",
3064                        column_name,
3065                        similar
3066                    ),
3067                    None => {
3068                        // Also list available columns for debugging
3069                        let available_cols = view.source().column_names().join(", ");
3070                        anyhow::anyhow!(
3071                            "Column '{}' not found. Available columns: {}",
3072                            column_name,
3073                            available_cols
3074                        )
3075                    }
3076                }
3077            })?;
3078
3079            let ascending = matches!(order_col.direction, SortDirection::Asc);
3080            sort_columns.push((col_index, ascending));
3081        }
3082
3083        // Apply multi-column sorting
3084        view.apply_multi_sort(&sort_columns)?;
3085        Ok(view)
3086    }
3087
3088    /// Apply GROUP BY to the view with optional HAVING clause
3089    fn apply_group_by(
3090        &self,
3091        view: DataView,
3092        group_by_exprs: &[SqlExpression],
3093        select_items: &[SelectItem],
3094        having: Option<&SqlExpression>,
3095        plan: &mut ExecutionPlanBuilder,
3096    ) -> Result<DataView> {
3097        // Use the new expression-based GROUP BY implementation
3098        let (result_view, phase_info) = self.apply_group_by_expressions(
3099            view,
3100            group_by_exprs,
3101            select_items,
3102            having,
3103            self.case_insensitive,
3104            self.date_notation.clone(),
3105        )?;
3106
3107        // Add detailed phase information to the execution plan
3108        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3109        plan.add_detail(format!(
3110            "Phase 1 - Group Building: {:.3}ms",
3111            phase_info.phase2_key_building.as_secs_f64() * 1000.0
3112        ));
3113        plan.add_detail(format!(
3114            "  • Processing {} rows into {} groups",
3115            phase_info.total_rows, phase_info.num_groups
3116        ));
3117        plan.add_detail(format!(
3118            "Phase 2 - Aggregation: {:.3}ms",
3119            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3120        ));
3121        if phase_info.phase4_having_evaluation > Duration::ZERO {
3122            plan.add_detail(format!(
3123                "Phase 3 - HAVING Filter: {:.3}ms",
3124                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3125            ));
3126            plan.add_detail(format!(
3127                "  • Filtered {} groups",
3128                phase_info.groups_filtered_by_having
3129            ));
3130        }
3131        plan.add_detail(format!(
3132            "Total GROUP BY time: {:.3}ms",
3133            phase_info.total_time.as_secs_f64() * 1000.0
3134        ));
3135
3136        Ok(result_view)
3137    }
3138
3139    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
3140    /// This helps pre-size hash tables for better performance
3141    pub fn estimate_group_cardinality(
3142        &self,
3143        view: &DataView,
3144        group_by_exprs: &[SqlExpression],
3145    ) -> usize {
3146        // If we have few rows, just return the row count as upper bound
3147        let row_count = view.get_visible_rows().len();
3148        if row_count <= 100 {
3149            return row_count;
3150        }
3151
3152        // Sample first 1000 rows or 10% of data, whichever is smaller
3153        let sample_size = min(1000, row_count / 10).max(100);
3154        let mut seen = FxHashSet::default();
3155
3156        let visible_rows = view.get_visible_rows();
3157        for (i, &row_idx) in visible_rows.iter().enumerate() {
3158            if i >= sample_size {
3159                break;
3160            }
3161
3162            // Evaluate GROUP BY expressions for this row
3163            let mut key_values = Vec::new();
3164            for expr in group_by_exprs {
3165                let mut evaluator = ArithmeticEvaluator::new(view.source());
3166                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3167                key_values.push(value);
3168            }
3169
3170            seen.insert(key_values);
3171        }
3172
3173        // Estimate total cardinality based on sample
3174        let sample_cardinality = seen.len();
3175        let estimated = (sample_cardinality * row_count) / sample_size;
3176
3177        // Cap at row count and ensure minimum of sample cardinality
3178        estimated.min(row_count).max(sample_cardinality)
3179    }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184    use super::*;
3185    use crate::data::datatable::{DataColumn, DataRow, DataValue};
3186
3187    fn create_test_table() -> Arc<DataTable> {
3188        let mut table = DataTable::new("test");
3189
3190        // Add columns
3191        table.add_column(DataColumn::new("id"));
3192        table.add_column(DataColumn::new("name"));
3193        table.add_column(DataColumn::new("age"));
3194
3195        // Add rows
3196        table
3197            .add_row(DataRow::new(vec![
3198                DataValue::Integer(1),
3199                DataValue::String("Alice".to_string()),
3200                DataValue::Integer(30),
3201            ]))
3202            .unwrap();
3203
3204        table
3205            .add_row(DataRow::new(vec![
3206                DataValue::Integer(2),
3207                DataValue::String("Bob".to_string()),
3208                DataValue::Integer(25),
3209            ]))
3210            .unwrap();
3211
3212        table
3213            .add_row(DataRow::new(vec![
3214                DataValue::Integer(3),
3215                DataValue::String("Charlie".to_string()),
3216                DataValue::Integer(35),
3217            ]))
3218            .unwrap();
3219
3220        Arc::new(table)
3221    }
3222
3223    #[test]
3224    fn test_select_all() {
3225        let table = create_test_table();
3226        let engine = QueryEngine::new();
3227
3228        let view = engine
3229            .execute(table.clone(), "SELECT * FROM users")
3230            .unwrap();
3231        assert_eq!(view.row_count(), 3);
3232        assert_eq!(view.column_count(), 3);
3233    }
3234
3235    #[test]
3236    fn test_select_columns() {
3237        let table = create_test_table();
3238        let engine = QueryEngine::new();
3239
3240        let view = engine
3241            .execute(table.clone(), "SELECT name, age FROM users")
3242            .unwrap();
3243        assert_eq!(view.row_count(), 3);
3244        assert_eq!(view.column_count(), 2);
3245    }
3246
3247    #[test]
3248    fn test_select_with_limit() {
3249        let table = create_test_table();
3250        let engine = QueryEngine::new();
3251
3252        let view = engine
3253            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3254            .unwrap();
3255        assert_eq!(view.row_count(), 2);
3256    }
3257
3258    #[test]
3259    fn test_type_coercion_contains() {
3260        // Initialize tracing for debug output
3261        let _ = tracing_subscriber::fmt()
3262            .with_max_level(tracing::Level::DEBUG)
3263            .try_init();
3264
3265        let mut table = DataTable::new("test");
3266        table.add_column(DataColumn::new("id"));
3267        table.add_column(DataColumn::new("status"));
3268        table.add_column(DataColumn::new("price"));
3269
3270        // Add test data with mixed types
3271        table
3272            .add_row(DataRow::new(vec![
3273                DataValue::Integer(1),
3274                DataValue::String("Pending".to_string()),
3275                DataValue::Float(99.99),
3276            ]))
3277            .unwrap();
3278
3279        table
3280            .add_row(DataRow::new(vec![
3281                DataValue::Integer(2),
3282                DataValue::String("Confirmed".to_string()),
3283                DataValue::Float(150.50),
3284            ]))
3285            .unwrap();
3286
3287        table
3288            .add_row(DataRow::new(vec![
3289                DataValue::Integer(3),
3290                DataValue::String("Pending".to_string()),
3291                DataValue::Float(75.00),
3292            ]))
3293            .unwrap();
3294
3295        let table = Arc::new(table);
3296        let engine = QueryEngine::new();
3297
3298        println!("\n=== Testing WHERE clause with Contains ===");
3299        println!("Table has {} rows", table.row_count());
3300        for i in 0..table.row_count() {
3301            let status = table.get_value(i, 1);
3302            println!("Row {i}: status = {status:?}");
3303        }
3304
3305        // Test 1: Basic string contains (should work)
3306        println!("\n--- Test 1: status.Contains('pend') ---");
3307        let result = engine.execute(
3308            table.clone(),
3309            "SELECT * FROM test WHERE status.Contains('pend')",
3310        );
3311        match result {
3312            Ok(view) => {
3313                println!("SUCCESS: Found {} matching rows", view.row_count());
3314                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3315            }
3316            Err(e) => {
3317                panic!("Query failed: {e}");
3318            }
3319        }
3320
3321        // Test 2: Numeric contains (should work with type coercion)
3322        println!("\n--- Test 2: price.Contains('9') ---");
3323        let result = engine.execute(
3324            table.clone(),
3325            "SELECT * FROM test WHERE price.Contains('9')",
3326        );
3327        match result {
3328            Ok(view) => {
3329                println!(
3330                    "SUCCESS: Found {} matching rows with price containing '9'",
3331                    view.row_count()
3332                );
3333                // Should find 99.99 row
3334                assert!(view.row_count() >= 1);
3335            }
3336            Err(e) => {
3337                panic!("Numeric coercion query failed: {e}");
3338            }
3339        }
3340
3341        println!("\n=== All tests passed! ===");
3342    }
3343
3344    #[test]
3345    fn test_not_in_clause() {
3346        // Initialize tracing for debug output
3347        let _ = tracing_subscriber::fmt()
3348            .with_max_level(tracing::Level::DEBUG)
3349            .try_init();
3350
3351        let mut table = DataTable::new("test");
3352        table.add_column(DataColumn::new("id"));
3353        table.add_column(DataColumn::new("country"));
3354
3355        // Add test data
3356        table
3357            .add_row(DataRow::new(vec![
3358                DataValue::Integer(1),
3359                DataValue::String("CA".to_string()),
3360            ]))
3361            .unwrap();
3362
3363        table
3364            .add_row(DataRow::new(vec![
3365                DataValue::Integer(2),
3366                DataValue::String("US".to_string()),
3367            ]))
3368            .unwrap();
3369
3370        table
3371            .add_row(DataRow::new(vec![
3372                DataValue::Integer(3),
3373                DataValue::String("UK".to_string()),
3374            ]))
3375            .unwrap();
3376
3377        let table = Arc::new(table);
3378        let engine = QueryEngine::new();
3379
3380        println!("\n=== Testing NOT IN clause ===");
3381        println!("Table has {} rows", table.row_count());
3382        for i in 0..table.row_count() {
3383            let country = table.get_value(i, 1);
3384            println!("Row {i}: country = {country:?}");
3385        }
3386
3387        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3388        println!("\n--- Test: country NOT IN ('CA') ---");
3389        let result = engine.execute(
3390            table.clone(),
3391            "SELECT * FROM test WHERE country NOT IN ('CA')",
3392        );
3393        match result {
3394            Ok(view) => {
3395                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3396                assert_eq!(view.row_count(), 2); // Should find US and UK
3397            }
3398            Err(e) => {
3399                panic!("NOT IN query failed: {e}");
3400            }
3401        }
3402
3403        println!("\n=== NOT IN test complete! ===");
3404    }
3405
3406    #[test]
3407    fn test_case_insensitive_in_and_not_in() {
3408        // Initialize tracing for debug output
3409        let _ = tracing_subscriber::fmt()
3410            .with_max_level(tracing::Level::DEBUG)
3411            .try_init();
3412
3413        let mut table = DataTable::new("test");
3414        table.add_column(DataColumn::new("id"));
3415        table.add_column(DataColumn::new("country"));
3416
3417        // Add test data with mixed case
3418        table
3419            .add_row(DataRow::new(vec![
3420                DataValue::Integer(1),
3421                DataValue::String("CA".to_string()), // uppercase
3422            ]))
3423            .unwrap();
3424
3425        table
3426            .add_row(DataRow::new(vec![
3427                DataValue::Integer(2),
3428                DataValue::String("us".to_string()), // lowercase
3429            ]))
3430            .unwrap();
3431
3432        table
3433            .add_row(DataRow::new(vec![
3434                DataValue::Integer(3),
3435                DataValue::String("UK".to_string()), // uppercase
3436            ]))
3437            .unwrap();
3438
3439        let table = Arc::new(table);
3440
3441        println!("\n=== Testing Case-Insensitive IN clause ===");
3442        println!("Table has {} rows", table.row_count());
3443        for i in 0..table.row_count() {
3444            let country = table.get_value(i, 1);
3445            println!("Row {i}: country = {country:?}");
3446        }
3447
3448        // Test case-insensitive IN - should match 'CA' with 'ca'
3449        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3450        let engine = QueryEngine::with_case_insensitive(true);
3451        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3452        match result {
3453            Ok(view) => {
3454                println!(
3455                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3456                    view.row_count()
3457                );
3458                assert_eq!(view.row_count(), 1); // Should find CA row
3459            }
3460            Err(e) => {
3461                panic!("Case-insensitive IN query failed: {e}");
3462            }
3463        }
3464
3465        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3466        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3467        let result = engine.execute(
3468            table.clone(),
3469            "SELECT * FROM test WHERE country NOT IN ('ca')",
3470        );
3471        match result {
3472            Ok(view) => {
3473                println!(
3474                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3475                    view.row_count()
3476                );
3477                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3478            }
3479            Err(e) => {
3480                panic!("Case-insensitive NOT IN query failed: {e}");
3481            }
3482        }
3483
3484        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3485        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3486        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3487        let result = engine_case_sensitive
3488            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3489        match result {
3490            Ok(view) => {
3491                println!(
3492                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3493                    view.row_count()
3494                );
3495                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3496            }
3497            Err(e) => {
3498                panic!("Case-sensitive IN query failed: {e}");
3499            }
3500        }
3501
3502        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3503    }
3504
3505    #[test]
3506    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3507    fn test_parentheses_in_where_clause() {
3508        // Initialize tracing for debug output
3509        let _ = tracing_subscriber::fmt()
3510            .with_max_level(tracing::Level::DEBUG)
3511            .try_init();
3512
3513        let mut table = DataTable::new("test");
3514        table.add_column(DataColumn::new("id"));
3515        table.add_column(DataColumn::new("status"));
3516        table.add_column(DataColumn::new("priority"));
3517
3518        // Add test data
3519        table
3520            .add_row(DataRow::new(vec![
3521                DataValue::Integer(1),
3522                DataValue::String("Pending".to_string()),
3523                DataValue::String("High".to_string()),
3524            ]))
3525            .unwrap();
3526
3527        table
3528            .add_row(DataRow::new(vec![
3529                DataValue::Integer(2),
3530                DataValue::String("Complete".to_string()),
3531                DataValue::String("High".to_string()),
3532            ]))
3533            .unwrap();
3534
3535        table
3536            .add_row(DataRow::new(vec![
3537                DataValue::Integer(3),
3538                DataValue::String("Pending".to_string()),
3539                DataValue::String("Low".to_string()),
3540            ]))
3541            .unwrap();
3542
3543        table
3544            .add_row(DataRow::new(vec![
3545                DataValue::Integer(4),
3546                DataValue::String("Complete".to_string()),
3547                DataValue::String("Low".to_string()),
3548            ]))
3549            .unwrap();
3550
3551        let table = Arc::new(table);
3552        let engine = QueryEngine::new();
3553
3554        println!("\n=== Testing Parentheses in WHERE clause ===");
3555        println!("Table has {} rows", table.row_count());
3556        for i in 0..table.row_count() {
3557            let status = table.get_value(i, 1);
3558            let priority = table.get_value(i, 2);
3559            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3560        }
3561
3562        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3563        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3564        let result = engine.execute(
3565            table.clone(),
3566            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3567        );
3568        match result {
3569            Ok(view) => {
3570                println!(
3571                    "SUCCESS: Found {} rows with parenthetical logic",
3572                    view.row_count()
3573                );
3574                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3575            }
3576            Err(e) => {
3577                panic!("Parentheses query failed: {e}");
3578            }
3579        }
3580
3581        println!("\n=== Parentheses test complete! ===");
3582    }
3583
3584    #[test]
3585    #[ignore = "Numeric type coercion needs fixing"]
3586    fn test_numeric_type_coercion() {
3587        // Initialize tracing for debug output
3588        let _ = tracing_subscriber::fmt()
3589            .with_max_level(tracing::Level::DEBUG)
3590            .try_init();
3591
3592        let mut table = DataTable::new("test");
3593        table.add_column(DataColumn::new("id"));
3594        table.add_column(DataColumn::new("price"));
3595        table.add_column(DataColumn::new("quantity"));
3596
3597        // Add test data with different numeric types
3598        table
3599            .add_row(DataRow::new(vec![
3600                DataValue::Integer(1),
3601                DataValue::Float(99.50), // Contains '.'
3602                DataValue::Integer(100),
3603            ]))
3604            .unwrap();
3605
3606        table
3607            .add_row(DataRow::new(vec![
3608                DataValue::Integer(2),
3609                DataValue::Float(150.0), // Contains '.' and '0'
3610                DataValue::Integer(200),
3611            ]))
3612            .unwrap();
3613
3614        table
3615            .add_row(DataRow::new(vec![
3616                DataValue::Integer(3),
3617                DataValue::Integer(75), // No decimal point
3618                DataValue::Integer(50),
3619            ]))
3620            .unwrap();
3621
3622        let table = Arc::new(table);
3623        let engine = QueryEngine::new();
3624
3625        println!("\n=== Testing Numeric Type Coercion ===");
3626        println!("Table has {} rows", table.row_count());
3627        for i in 0..table.row_count() {
3628            let price = table.get_value(i, 1);
3629            let quantity = table.get_value(i, 2);
3630            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3631        }
3632
3633        // Test Contains on float values - should find rows with decimal points
3634        println!("\n--- Test: price.Contains('.') ---");
3635        let result = engine.execute(
3636            table.clone(),
3637            "SELECT * FROM test WHERE price.Contains('.')",
3638        );
3639        match result {
3640            Ok(view) => {
3641                println!(
3642                    "SUCCESS: Found {} rows with decimal points in price",
3643                    view.row_count()
3644                );
3645                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3646            }
3647            Err(e) => {
3648                panic!("Numeric Contains query failed: {e}");
3649            }
3650        }
3651
3652        // Test Contains on integer values converted to string
3653        println!("\n--- Test: quantity.Contains('0') ---");
3654        let result = engine.execute(
3655            table.clone(),
3656            "SELECT * FROM test WHERE quantity.Contains('0')",
3657        );
3658        match result {
3659            Ok(view) => {
3660                println!(
3661                    "SUCCESS: Found {} rows with '0' in quantity",
3662                    view.row_count()
3663                );
3664                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3665            }
3666            Err(e) => {
3667                panic!("Integer Contains query failed: {e}");
3668            }
3669        }
3670
3671        println!("\n=== Numeric type coercion test complete! ===");
3672    }
3673
3674    #[test]
3675    fn test_datetime_comparisons() {
3676        // Initialize tracing for debug output
3677        let _ = tracing_subscriber::fmt()
3678            .with_max_level(tracing::Level::DEBUG)
3679            .try_init();
3680
3681        let mut table = DataTable::new("test");
3682        table.add_column(DataColumn::new("id"));
3683        table.add_column(DataColumn::new("created_date"));
3684
3685        // Add test data with date strings (as they would come from CSV)
3686        table
3687            .add_row(DataRow::new(vec![
3688                DataValue::Integer(1),
3689                DataValue::String("2024-12-15".to_string()),
3690            ]))
3691            .unwrap();
3692
3693        table
3694            .add_row(DataRow::new(vec![
3695                DataValue::Integer(2),
3696                DataValue::String("2025-01-15".to_string()),
3697            ]))
3698            .unwrap();
3699
3700        table
3701            .add_row(DataRow::new(vec![
3702                DataValue::Integer(3),
3703                DataValue::String("2025-02-15".to_string()),
3704            ]))
3705            .unwrap();
3706
3707        let table = Arc::new(table);
3708        let engine = QueryEngine::new();
3709
3710        println!("\n=== Testing DateTime Comparisons ===");
3711        println!("Table has {} rows", table.row_count());
3712        for i in 0..table.row_count() {
3713            let date = table.get_value(i, 1);
3714            println!("Row {i}: created_date = {date:?}");
3715        }
3716
3717        // Test DateTime constructor comparison - should find dates after 2025-01-01
3718        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3719        let result = engine.execute(
3720            table.clone(),
3721            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3722        );
3723        match result {
3724            Ok(view) => {
3725                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3726                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3727            }
3728            Err(e) => {
3729                panic!("DateTime comparison query failed: {e}");
3730            }
3731        }
3732
3733        println!("\n=== DateTime comparison test complete! ===");
3734    }
3735
3736    #[test]
3737    fn test_not_with_method_calls() {
3738        // Initialize tracing for debug output
3739        let _ = tracing_subscriber::fmt()
3740            .with_max_level(tracing::Level::DEBUG)
3741            .try_init();
3742
3743        let mut table = DataTable::new("test");
3744        table.add_column(DataColumn::new("id"));
3745        table.add_column(DataColumn::new("status"));
3746
3747        // Add test data
3748        table
3749            .add_row(DataRow::new(vec![
3750                DataValue::Integer(1),
3751                DataValue::String("Pending Review".to_string()),
3752            ]))
3753            .unwrap();
3754
3755        table
3756            .add_row(DataRow::new(vec![
3757                DataValue::Integer(2),
3758                DataValue::String("Complete".to_string()),
3759            ]))
3760            .unwrap();
3761
3762        table
3763            .add_row(DataRow::new(vec![
3764                DataValue::Integer(3),
3765                DataValue::String("Pending Approval".to_string()),
3766            ]))
3767            .unwrap();
3768
3769        let table = Arc::new(table);
3770        let engine = QueryEngine::with_case_insensitive(true);
3771
3772        println!("\n=== Testing NOT with Method Calls ===");
3773        println!("Table has {} rows", table.row_count());
3774        for i in 0..table.row_count() {
3775            let status = table.get_value(i, 1);
3776            println!("Row {i}: status = {status:?}");
3777        }
3778
3779        // Test NOT with Contains - should exclude rows containing "pend"
3780        println!("\n--- Test: NOT status.Contains('pend') ---");
3781        let result = engine.execute(
3782            table.clone(),
3783            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3784        );
3785        match result {
3786            Ok(view) => {
3787                println!(
3788                    "SUCCESS: Found {} rows NOT containing 'pend'",
3789                    view.row_count()
3790                );
3791                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3792            }
3793            Err(e) => {
3794                panic!("NOT Contains query failed: {e}");
3795            }
3796        }
3797
3798        // Test NOT with StartsWith
3799        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3800        let result = engine.execute(
3801            table.clone(),
3802            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3803        );
3804        match result {
3805            Ok(view) => {
3806                println!(
3807                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3808                    view.row_count()
3809                );
3810                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3811            }
3812            Err(e) => {
3813                panic!("NOT StartsWith query failed: {e}");
3814            }
3815        }
3816
3817        println!("\n=== NOT with method calls test complete! ===");
3818    }
3819
3820    #[test]
3821    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3822    fn test_complex_logical_expressions() {
3823        // Initialize tracing for debug output
3824        let _ = tracing_subscriber::fmt()
3825            .with_max_level(tracing::Level::DEBUG)
3826            .try_init();
3827
3828        let mut table = DataTable::new("test");
3829        table.add_column(DataColumn::new("id"));
3830        table.add_column(DataColumn::new("status"));
3831        table.add_column(DataColumn::new("priority"));
3832        table.add_column(DataColumn::new("assigned"));
3833
3834        // Add comprehensive test data
3835        table
3836            .add_row(DataRow::new(vec![
3837                DataValue::Integer(1),
3838                DataValue::String("Pending".to_string()),
3839                DataValue::String("High".to_string()),
3840                DataValue::String("John".to_string()),
3841            ]))
3842            .unwrap();
3843
3844        table
3845            .add_row(DataRow::new(vec![
3846                DataValue::Integer(2),
3847                DataValue::String("Complete".to_string()),
3848                DataValue::String("High".to_string()),
3849                DataValue::String("Jane".to_string()),
3850            ]))
3851            .unwrap();
3852
3853        table
3854            .add_row(DataRow::new(vec![
3855                DataValue::Integer(3),
3856                DataValue::String("Pending".to_string()),
3857                DataValue::String("Low".to_string()),
3858                DataValue::String("John".to_string()),
3859            ]))
3860            .unwrap();
3861
3862        table
3863            .add_row(DataRow::new(vec![
3864                DataValue::Integer(4),
3865                DataValue::String("In Progress".to_string()),
3866                DataValue::String("Medium".to_string()),
3867                DataValue::String("Jane".to_string()),
3868            ]))
3869            .unwrap();
3870
3871        let table = Arc::new(table);
3872        let engine = QueryEngine::new();
3873
3874        println!("\n=== Testing Complex Logical Expressions ===");
3875        println!("Table has {} rows", table.row_count());
3876        for i in 0..table.row_count() {
3877            let status = table.get_value(i, 1);
3878            let priority = table.get_value(i, 2);
3879            let assigned = table.get_value(i, 3);
3880            println!(
3881                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3882            );
3883        }
3884
3885        // Test complex AND/OR logic
3886        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3887        let result = engine.execute(
3888            table.clone(),
3889            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3890        );
3891        match result {
3892            Ok(view) => {
3893                println!(
3894                    "SUCCESS: Found {} rows with complex logic",
3895                    view.row_count()
3896                );
3897                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3898            }
3899            Err(e) => {
3900                panic!("Complex logic query failed: {e}");
3901            }
3902        }
3903
3904        // Test NOT with complex expressions
3905        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3906        let result = engine.execute(
3907            table.clone(),
3908            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3909        );
3910        match result {
3911            Ok(view) => {
3912                println!(
3913                    "SUCCESS: Found {} rows with NOT complex logic",
3914                    view.row_count()
3915                );
3916                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3917            }
3918            Err(e) => {
3919                panic!("NOT complex logic query failed: {e}");
3920            }
3921        }
3922
3923        println!("\n=== Complex logical expressions test complete! ===");
3924    }
3925
3926    #[test]
3927    fn test_mixed_data_types_and_edge_cases() {
3928        // Initialize tracing for debug output
3929        let _ = tracing_subscriber::fmt()
3930            .with_max_level(tracing::Level::DEBUG)
3931            .try_init();
3932
3933        let mut table = DataTable::new("test");
3934        table.add_column(DataColumn::new("id"));
3935        table.add_column(DataColumn::new("value"));
3936        table.add_column(DataColumn::new("nullable_field"));
3937
3938        // Add test data with mixed types and edge cases
3939        table
3940            .add_row(DataRow::new(vec![
3941                DataValue::Integer(1),
3942                DataValue::String("123.45".to_string()),
3943                DataValue::String("present".to_string()),
3944            ]))
3945            .unwrap();
3946
3947        table
3948            .add_row(DataRow::new(vec![
3949                DataValue::Integer(2),
3950                DataValue::Float(678.90),
3951                DataValue::Null,
3952            ]))
3953            .unwrap();
3954
3955        table
3956            .add_row(DataRow::new(vec![
3957                DataValue::Integer(3),
3958                DataValue::Boolean(true),
3959                DataValue::String("also present".to_string()),
3960            ]))
3961            .unwrap();
3962
3963        table
3964            .add_row(DataRow::new(vec![
3965                DataValue::Integer(4),
3966                DataValue::String("false".to_string()),
3967                DataValue::Null,
3968            ]))
3969            .unwrap();
3970
3971        let table = Arc::new(table);
3972        let engine = QueryEngine::new();
3973
3974        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3975        println!("Table has {} rows", table.row_count());
3976        for i in 0..table.row_count() {
3977            let value = table.get_value(i, 1);
3978            let nullable = table.get_value(i, 2);
3979            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3980        }
3981
3982        // Test type coercion with boolean Contains
3983        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3984        let result = engine.execute(
3985            table.clone(),
3986            "SELECT * FROM test WHERE value.Contains('true')",
3987        );
3988        match result {
3989            Ok(view) => {
3990                println!(
3991                    "SUCCESS: Found {} rows with boolean coercion",
3992                    view.row_count()
3993                );
3994                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3995            }
3996            Err(e) => {
3997                panic!("Boolean coercion query failed: {e}");
3998            }
3999        }
4000
4001        // Test multiple IN values with mixed types
4002        println!("\n--- Test: id IN (1, 3) ---");
4003        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
4004        match result {
4005            Ok(view) => {
4006                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
4007                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
4008            }
4009            Err(e) => {
4010                panic!("Multiple IN values query failed: {e}");
4011            }
4012        }
4013
4014        println!("\n=== Mixed data types test complete! ===");
4015    }
4016
4017    /// Test that aggregate-only queries return exactly one row (regression test)
4018    #[test]
4019    fn test_aggregate_only_single_row() {
4020        let table = create_test_stock_data();
4021        let engine = QueryEngine::new();
4022
4023        // Test query with multiple aggregates - should return exactly 1 row
4024        let result = engine
4025            .execute(
4026                table.clone(),
4027                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4028            )
4029            .expect("Query should succeed");
4030
4031        assert_eq!(
4032            result.row_count(),
4033            1,
4034            "Aggregate-only query should return exactly 1 row"
4035        );
4036        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4037
4038        // Verify the actual values are correct
4039        let source = result.source();
4040        let row = source.get_row(0).expect("Should have first row");
4041
4042        // COUNT(*) should be 5 (total rows)
4043        assert_eq!(row.values[0], DataValue::Integer(5));
4044
4045        // MIN should be 99.5
4046        assert_eq!(row.values[1], DataValue::Float(99.5));
4047
4048        // MAX should be 105.0
4049        assert_eq!(row.values[2], DataValue::Float(105.0));
4050
4051        // AVG should be approximately 102.4
4052        if let DataValue::Float(avg) = &row.values[3] {
4053            assert!(
4054                (avg - 102.4).abs() < 0.01,
4055                "Average should be approximately 102.4, got {}",
4056                avg
4057            );
4058        } else {
4059            panic!("AVG should return a Float value");
4060        }
4061    }
4062
4063    /// Test single aggregate function returns single row
4064    #[test]
4065    fn test_single_aggregate_single_row() {
4066        let table = create_test_stock_data();
4067        let engine = QueryEngine::new();
4068
4069        let result = engine
4070            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4071            .expect("Query should succeed");
4072
4073        assert_eq!(
4074            result.row_count(),
4075            1,
4076            "Single aggregate query should return exactly 1 row"
4077        );
4078        assert_eq!(result.column_count(), 1, "Should have 1 column");
4079
4080        let source = result.source();
4081        let row = source.get_row(0).expect("Should have first row");
4082        assert_eq!(row.values[0], DataValue::Integer(5));
4083    }
4084
4085    /// Test aggregate with WHERE clause filtering
4086    #[test]
4087    fn test_aggregate_with_where_single_row() {
4088        let table = create_test_stock_data();
4089        let engine = QueryEngine::new();
4090
4091        // Filter to only high-value stocks (>= 103.0) and aggregate
4092        let result = engine
4093            .execute(
4094                table.clone(),
4095                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4096            )
4097            .expect("Query should succeed");
4098
4099        assert_eq!(
4100            result.row_count(),
4101            1,
4102            "Filtered aggregate query should return exactly 1 row"
4103        );
4104        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4105
4106        let source = result.source();
4107        let row = source.get_row(0).expect("Should have first row");
4108
4109        // Should find 2 rows (103.5 and 105.0)
4110        assert_eq!(row.values[0], DataValue::Integer(2));
4111        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
4112        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
4113    }
4114
4115    #[test]
4116    fn test_not_in_parsing() {
4117        use crate::sql::recursive_parser::Parser;
4118
4119        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4120        println!("\n=== Testing NOT IN parsing ===");
4121        println!("Parsing query: {query}");
4122
4123        let mut parser = Parser::new(query);
4124        match parser.parse() {
4125            Ok(statement) => {
4126                println!("Parsed statement: {statement:#?}");
4127                if let Some(where_clause) = statement.where_clause {
4128                    println!("WHERE conditions: {:#?}", where_clause.conditions);
4129                    if let Some(first_condition) = where_clause.conditions.first() {
4130                        println!("First condition expression: {:#?}", first_condition.expr);
4131                    }
4132                }
4133            }
4134            Err(e) => {
4135                panic!("Parse error: {e}");
4136            }
4137        }
4138    }
4139
4140    /// Create test stock data for aggregate testing
4141    fn create_test_stock_data() -> Arc<DataTable> {
4142        let mut table = DataTable::new("stock");
4143
4144        table.add_column(DataColumn::new("symbol"));
4145        table.add_column(DataColumn::new("close"));
4146        table.add_column(DataColumn::new("volume"));
4147
4148        // Add 5 rows of test data
4149        let test_data = vec![
4150            ("AAPL", 99.5, 1000),
4151            ("AAPL", 101.2, 1500),
4152            ("AAPL", 103.5, 2000),
4153            ("AAPL", 105.0, 1200),
4154            ("AAPL", 102.8, 1800),
4155        ];
4156
4157        for (symbol, close, volume) in test_data {
4158            table
4159                .add_row(DataRow::new(vec![
4160                    DataValue::String(symbol.to_string()),
4161                    DataValue::Float(close),
4162                    DataValue::Integer(volume),
4163                ]))
4164                .expect("Should add row successfully");
4165        }
4166
4167        Arc::new(table)
4168    }
4169}
4170
4171#[cfg(test)]
4172#[path = "query_engine_tests.rs"]
4173mod query_engine_tests;