Skip to main content

sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Look up a CTE by name with case-insensitive fallback.
33/// Exact match is tried first (fast path); if that fails, a case-insensitive
34/// scan finds tables like `Orders` when the query references `orders`.
35/// This matches MySQL/PostgreSQL behaviour for unquoted identifiers.
36fn resolve_cte<'a>(
37    context: &'a HashMap<String, Arc<DataView>>,
38    name: &str,
39) -> Option<&'a Arc<DataView>> {
40    if let Some(v) = context.get(name) {
41        return Some(v);
42    }
43    let lower = name.to_lowercase();
44    context
45        .iter()
46        .find(|(k, _)| k.to_lowercase() == lower)
47        .map(|(_, v)| v)
48}
49
50/// Execution context for tracking table aliases and scope during query execution
51#[derive(Debug, Clone)]
52pub struct ExecutionContext {
53    /// Map from alias to actual table/CTE name
54    /// Example: "t" -> "#tmp_trades", "a" -> "data"
55    alias_map: HashMap<String, String>,
56}
57
58impl ExecutionContext {
59    /// Create a new empty execution context
60    pub fn new() -> Self {
61        Self {
62            alias_map: HashMap::new(),
63        }
64    }
65
66    /// Register a table alias
67    pub fn register_alias(&mut self, alias: String, table_name: String) {
68        debug!("Registering alias: {} -> {}", alias, table_name);
69        self.alias_map.insert(alias, table_name);
70    }
71
72    /// Resolve an alias to its actual table name
73    /// Returns the alias itself if not found in the map
74    pub fn resolve_alias(&self, name: &str) -> String {
75        self.alias_map
76            .get(name)
77            .cloned()
78            .unwrap_or_else(|| name.to_string())
79    }
80
81    /// Check if a name is a registered alias
82    pub fn is_alias(&self, name: &str) -> bool {
83        self.alias_map.contains_key(name)
84    }
85
86    /// Get a copy of all registered aliases
87    pub fn get_aliases(&self) -> HashMap<String, String> {
88        self.alias_map.clone()
89    }
90
91    /// Resolve a column reference to its index in the table, handling aliases
92    ///
93    /// This is the unified column resolution function that should be used by all
94    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
95    /// alias resolution behavior.
96    ///
97    /// Resolution strategy:
98    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
99    ///    a. Resolve the alias: t -> actual_table_name
100    ///    b. Try qualified lookup: "actual_table_name.amount"
101    ///    c. Fall back to unqualified: "amount"
102    /// 2. If column_ref has no prefix:
103    ///    a. Try simple column name lookup: "amount"
104    ///    b. Try as qualified name if it contains a dot: "table.column"
105    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
106        if let Some(table_prefix) = &column_ref.table_prefix {
107            // Qualified column reference: resolve the alias first
108            let actual_table = self.resolve_alias(table_prefix);
109
110            // Try qualified lookup: "actual_table.column"
111            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
112            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
113                debug!(
114                    "Resolved {}.{} -> qualified column '{}' at index {}",
115                    table_prefix, column_ref.name, qualified_name, idx
116                );
117                return Ok(idx);
118            }
119
120            // Fall back to unqualified lookup
121            if let Some(idx) = table.get_column_index(&column_ref.name) {
122                debug!(
123                    "Resolved {}.{} -> unqualified column '{}' at index {}",
124                    table_prefix, column_ref.name, column_ref.name, idx
125                );
126                return Ok(idx);
127            }
128
129            // Not found with either qualified or unqualified name
130            Err(anyhow!(
131                "Column '{}' not found. Table '{}' may not support qualified column names",
132                qualified_name,
133                actual_table
134            ))
135        } else {
136            // Unqualified column reference
137            if let Some(idx) = table.get_column_index(&column_ref.name) {
138                debug!(
139                    "Resolved unqualified column '{}' at index {}",
140                    column_ref.name, idx
141                );
142                return Ok(idx);
143            }
144
145            // If the column name contains a dot, try it as a qualified name
146            if column_ref.name.contains('.') {
147                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
148                    debug!(
149                        "Resolved '{}' as qualified column at index {}",
150                        column_ref.name, idx
151                    );
152                    return Ok(idx);
153                }
154            }
155
156            // Column not found - provide helpful error
157            let suggestion = self.find_similar_column(table, &column_ref.name);
158            match suggestion {
159                Some(similar) => Err(anyhow!(
160                    "Column '{}' not found. Did you mean '{}'?",
161                    column_ref.name,
162                    similar
163                )),
164                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
165            }
166        }
167    }
168
169    /// Find a similar column name using edit distance (for better error messages)
170    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
171        let columns = table.column_names();
172        let mut best_match: Option<(String, usize)> = None;
173
174        for col in columns {
175            let distance = edit_distance(name, &col);
176            if distance <= 2 {
177                // Allow up to 2 character differences
178                match best_match {
179                    Some((_, best_dist)) if distance < best_dist => {
180                        best_match = Some((col.clone(), distance));
181                    }
182                    None => {
183                        best_match = Some((col.clone(), distance));
184                    }
185                    _ => {}
186                }
187            }
188        }
189
190        best_match.map(|(name, _)| name)
191    }
192}
193
194impl Default for ExecutionContext {
195    fn default() -> Self {
196        Self::new()
197    }
198}
199
200/// Calculate edit distance between two strings (Levenshtein distance)
201fn edit_distance(a: &str, b: &str) -> usize {
202    let len_a = a.chars().count();
203    let len_b = b.chars().count();
204
205    if len_a == 0 {
206        return len_b;
207    }
208    if len_b == 0 {
209        return len_a;
210    }
211
212    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
213
214    for i in 0..=len_a {
215        matrix[i][0] = i;
216    }
217    for j in 0..=len_b {
218        matrix[0][j] = j;
219    }
220
221    let a_chars: Vec<char> = a.chars().collect();
222    let b_chars: Vec<char> = b.chars().collect();
223
224    for i in 1..=len_a {
225        for j in 1..=len_b {
226            let cost = if a_chars[i - 1] == b_chars[j - 1] {
227                0
228            } else {
229                1
230            };
231            matrix[i][j] = min(
232                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
233                matrix[i - 1][j - 1] + cost,
234            );
235        }
236    }
237
238    matrix[len_a][len_b]
239}
240
241/// Query engine that executes SQL directly on `DataTable`
242#[derive(Clone)]
243pub struct QueryEngine {
244    case_insensitive: bool,
245    date_notation: String,
246    _behavior_config: Option<BehaviorConfig>,
247}
248
249impl Default for QueryEngine {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255impl QueryEngine {
256    #[must_use]
257    pub fn new() -> Self {
258        Self {
259            case_insensitive: false,
260            date_notation: get_date_notation(),
261            _behavior_config: None,
262        }
263    }
264
265    #[must_use]
266    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
267        let case_insensitive = config.case_insensitive_default;
268        // Use get_date_notation() to respect environment variable override
269        let date_notation = get_date_notation();
270        Self {
271            case_insensitive,
272            date_notation,
273            _behavior_config: Some(config),
274        }
275    }
276
277    #[must_use]
278    pub fn with_date_notation(_date_notation: String) -> Self {
279        Self {
280            case_insensitive: false,
281            date_notation: get_date_notation(), // Always use the global function
282            _behavior_config: None,
283        }
284    }
285
286    #[must_use]
287    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
288        Self {
289            case_insensitive,
290            date_notation: get_date_notation(),
291            _behavior_config: None,
292        }
293    }
294
295    #[must_use]
296    pub fn with_case_insensitive_and_date_notation(
297        case_insensitive: bool,
298        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
299    ) -> Self {
300        Self {
301            case_insensitive,
302            date_notation: get_date_notation(), // Always use the global function
303            _behavior_config: None,
304        }
305    }
306
307    /// Find a column name similar to the given name using edit distance
308    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
309        let columns = table.column_names();
310        let mut best_match: Option<(String, usize)> = None;
311
312        for col in columns {
313            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
314            // Only suggest if distance is small (likely a typo)
315            // Allow up to 3 edits for longer names
316            let max_distance = if name.len() > 10 { 3 } else { 2 };
317            if distance <= max_distance {
318                match &best_match {
319                    None => best_match = Some((col, distance)),
320                    Some((_, best_dist)) if distance < *best_dist => {
321                        best_match = Some((col, distance));
322                    }
323                    _ => {}
324                }
325            }
326        }
327
328        best_match.map(|(name, _)| name)
329    }
330
331    /// Calculate Levenshtein edit distance between two strings
332    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
333        let len1 = s1.len();
334        let len2 = s2.len();
335        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
336
337        for i in 0..=len1 {
338            matrix[i][0] = i;
339        }
340        for j in 0..=len2 {
341            matrix[0][j] = j;
342        }
343
344        for (i, c1) in s1.chars().enumerate() {
345            for (j, c2) in s2.chars().enumerate() {
346                let cost = usize::from(c1 != c2);
347                matrix[i + 1][j + 1] = std::cmp::min(
348                    matrix[i][j + 1] + 1, // deletion
349                    std::cmp::min(
350                        matrix[i + 1][j] + 1, // insertion
351                        matrix[i][j] + cost,  // substitution
352                    ),
353                );
354            }
355        }
356
357        matrix[len1][len2]
358    }
359
360    /// Check if an expression contains UNNEST function call
361    fn contains_unnest(expr: &SqlExpression) -> bool {
362        match expr {
363            // Direct UNNEST variant
364            SqlExpression::Unnest { .. } => true,
365            SqlExpression::FunctionCall { name, args, .. } => {
366                if name.to_uppercase() == "UNNEST" {
367                    return true;
368                }
369                // Check recursively in function arguments
370                args.iter().any(Self::contains_unnest)
371            }
372            SqlExpression::BinaryOp { left, right, .. } => {
373                Self::contains_unnest(left) || Self::contains_unnest(right)
374            }
375            SqlExpression::Not { expr } => Self::contains_unnest(expr),
376            SqlExpression::CaseExpression {
377                when_branches,
378                else_branch,
379            } => {
380                when_branches.iter().any(|branch| {
381                    Self::contains_unnest(&branch.condition)
382                        || Self::contains_unnest(&branch.result)
383                }) || else_branch
384                    .as_ref()
385                    .map_or(false, |e| Self::contains_unnest(e))
386            }
387            SqlExpression::SimpleCaseExpression {
388                expr,
389                when_branches,
390                else_branch,
391            } => {
392                Self::contains_unnest(expr)
393                    || when_branches.iter().any(|branch| {
394                        Self::contains_unnest(&branch.value)
395                            || Self::contains_unnest(&branch.result)
396                    })
397                    || else_branch
398                        .as_ref()
399                        .map_or(false, |e| Self::contains_unnest(e))
400            }
401            SqlExpression::InList { expr, values } => {
402                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
403            }
404            SqlExpression::NotInList { expr, values } => {
405                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
406            }
407            SqlExpression::Between { expr, lower, upper } => {
408                Self::contains_unnest(expr)
409                    || Self::contains_unnest(lower)
410                    || Self::contains_unnest(upper)
411            }
412            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
413            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
414            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
415            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
416            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
417            SqlExpression::ChainedMethodCall { base, args, .. } => {
418                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
419            }
420            _ => false,
421        }
422    }
423
424    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
425    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
426        match expr {
427            SqlExpression::WindowFunction {
428                window_spec, args, ..
429            } => {
430                // Add this window spec
431                specs.push(window_spec.clone());
432                // Recursively check arguments
433                for arg in args {
434                    Self::collect_window_specs(arg, specs);
435                }
436            }
437            SqlExpression::BinaryOp { left, right, .. } => {
438                Self::collect_window_specs(left, specs);
439                Self::collect_window_specs(right, specs);
440            }
441            SqlExpression::Not { expr } => {
442                Self::collect_window_specs(expr, specs);
443            }
444            SqlExpression::FunctionCall { args, .. } => {
445                for arg in args {
446                    Self::collect_window_specs(arg, specs);
447                }
448            }
449            SqlExpression::CaseExpression {
450                when_branches,
451                else_branch,
452            } => {
453                for branch in when_branches {
454                    Self::collect_window_specs(&branch.condition, specs);
455                    Self::collect_window_specs(&branch.result, specs);
456                }
457                if let Some(else_expr) = else_branch {
458                    Self::collect_window_specs(else_expr, specs);
459                }
460            }
461            SqlExpression::SimpleCaseExpression {
462                expr,
463                when_branches,
464                else_branch,
465            } => {
466                Self::collect_window_specs(expr, specs);
467                for branch in when_branches {
468                    Self::collect_window_specs(&branch.value, specs);
469                    Self::collect_window_specs(&branch.result, specs);
470                }
471                if let Some(else_expr) = else_branch {
472                    Self::collect_window_specs(else_expr, specs);
473                }
474            }
475            SqlExpression::InList { expr, values, .. } => {
476                Self::collect_window_specs(expr, specs);
477                for item in values {
478                    Self::collect_window_specs(item, specs);
479                }
480            }
481            SqlExpression::ChainedMethodCall { base, args, .. } => {
482                Self::collect_window_specs(base, specs);
483                for arg in args {
484                    Self::collect_window_specs(arg, specs);
485                }
486            }
487            // Leaf nodes - no recursion needed
488            SqlExpression::Column(_)
489            | SqlExpression::NumberLiteral(_)
490            | SqlExpression::StringLiteral(_)
491            | SqlExpression::BooleanLiteral(_)
492            | SqlExpression::Null
493            | SqlExpression::DateTimeToday { .. }
494            | SqlExpression::DateTimeConstructor { .. }
495            | SqlExpression::MethodCall { .. } => {}
496            // Catch-all for any other variants
497            _ => {}
498        }
499    }
500
501    /// Check if an expression contains a window function
502    fn contains_window_function(expr: &SqlExpression) -> bool {
503        match expr {
504            SqlExpression::WindowFunction { .. } => true,
505            SqlExpression::BinaryOp { left, right, .. } => {
506                Self::contains_window_function(left) || Self::contains_window_function(right)
507            }
508            SqlExpression::Not { expr } => Self::contains_window_function(expr),
509            SqlExpression::FunctionCall { args, .. } => {
510                args.iter().any(Self::contains_window_function)
511            }
512            SqlExpression::CaseExpression {
513                when_branches,
514                else_branch,
515            } => {
516                when_branches.iter().any(|branch| {
517                    Self::contains_window_function(&branch.condition)
518                        || Self::contains_window_function(&branch.result)
519                }) || else_branch
520                    .as_ref()
521                    .map_or(false, |e| Self::contains_window_function(e))
522            }
523            SqlExpression::SimpleCaseExpression {
524                expr,
525                when_branches,
526                else_branch,
527            } => {
528                Self::contains_window_function(expr)
529                    || when_branches.iter().any(|branch| {
530                        Self::contains_window_function(&branch.value)
531                            || Self::contains_window_function(&branch.result)
532                    })
533                    || else_branch
534                        .as_ref()
535                        .map_or(false, |e| Self::contains_window_function(e))
536            }
537            SqlExpression::InList { expr, values } => {
538                Self::contains_window_function(expr)
539                    || values.iter().any(Self::contains_window_function)
540            }
541            SqlExpression::NotInList { expr, values } => {
542                Self::contains_window_function(expr)
543                    || values.iter().any(Self::contains_window_function)
544            }
545            SqlExpression::Between { expr, lower, upper } => {
546                Self::contains_window_function(expr)
547                    || Self::contains_window_function(lower)
548                    || Self::contains_window_function(upper)
549            }
550            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
551            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
552            SqlExpression::MethodCall { args, .. } => {
553                args.iter().any(Self::contains_window_function)
554            }
555            SqlExpression::ChainedMethodCall { base, args, .. } => {
556                Self::contains_window_function(base)
557                    || args.iter().any(Self::contains_window_function)
558            }
559            _ => false,
560        }
561    }
562
563    /// Extract all window function specifications from select items
564    fn extract_window_specs(
565        items: &[SelectItem],
566    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
567        let mut specs = Vec::new();
568        for (idx, item) in items.iter().enumerate() {
569            if let SelectItem::Expression { expr, .. } = item {
570                Self::collect_window_function_specs(expr, idx, &mut specs);
571            }
572        }
573        specs
574    }
575
576    /// Recursively collect window function specs from an expression
577    fn collect_window_function_specs(
578        expr: &SqlExpression,
579        output_column_index: usize,
580        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
581    ) {
582        match expr {
583            SqlExpression::WindowFunction {
584                name,
585                args,
586                window_spec,
587            } => {
588                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
589                    spec: window_spec.clone(),
590                    function_name: name.clone(),
591                    args: args.clone(),
592                    output_column_index,
593                });
594            }
595            SqlExpression::BinaryOp { left, right, .. } => {
596                Self::collect_window_function_specs(left, output_column_index, specs);
597                Self::collect_window_function_specs(right, output_column_index, specs);
598            }
599            SqlExpression::Not { expr } => {
600                Self::collect_window_function_specs(expr, output_column_index, specs);
601            }
602            SqlExpression::FunctionCall { args, .. } => {
603                for arg in args {
604                    Self::collect_window_function_specs(arg, output_column_index, specs);
605                }
606            }
607            SqlExpression::CaseExpression {
608                when_branches,
609                else_branch,
610            } => {
611                for branch in when_branches {
612                    Self::collect_window_function_specs(
613                        &branch.condition,
614                        output_column_index,
615                        specs,
616                    );
617                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
618                }
619                if let Some(e) = else_branch {
620                    Self::collect_window_function_specs(e, output_column_index, specs);
621                }
622            }
623            SqlExpression::SimpleCaseExpression {
624                expr,
625                when_branches,
626                else_branch,
627            } => {
628                Self::collect_window_function_specs(expr, output_column_index, specs);
629                for branch in when_branches {
630                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
631                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
632                }
633                if let Some(e) = else_branch {
634                    Self::collect_window_function_specs(e, output_column_index, specs);
635                }
636            }
637            SqlExpression::InList { expr, values } => {
638                Self::collect_window_function_specs(expr, output_column_index, specs);
639                for val in values {
640                    Self::collect_window_function_specs(val, output_column_index, specs);
641                }
642            }
643            SqlExpression::NotInList { expr, values } => {
644                Self::collect_window_function_specs(expr, output_column_index, specs);
645                for val in values {
646                    Self::collect_window_function_specs(val, output_column_index, specs);
647                }
648            }
649            SqlExpression::Between { expr, lower, upper } => {
650                Self::collect_window_function_specs(expr, output_column_index, specs);
651                Self::collect_window_function_specs(lower, output_column_index, specs);
652                Self::collect_window_function_specs(upper, output_column_index, specs);
653            }
654            SqlExpression::InSubquery { expr, .. } => {
655                Self::collect_window_function_specs(expr, output_column_index, specs);
656            }
657            SqlExpression::NotInSubquery { expr, .. } => {
658                Self::collect_window_function_specs(expr, output_column_index, specs);
659            }
660            SqlExpression::MethodCall { args, .. } => {
661                for arg in args {
662                    Self::collect_window_function_specs(arg, output_column_index, specs);
663                }
664            }
665            SqlExpression::ChainedMethodCall { base, args, .. } => {
666                Self::collect_window_function_specs(base, output_column_index, specs);
667                for arg in args {
668                    Self::collect_window_function_specs(arg, output_column_index, specs);
669                }
670            }
671            _ => {} // Other expression types don't contain window functions
672        }
673    }
674
675    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
676    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
677        let (view, _plan) = self.execute_with_plan(table, sql)?;
678        Ok(view)
679    }
680
681    /// Execute a SQL query with optional temp table registry access
682    pub fn execute_with_temp_tables(
683        &self,
684        table: Arc<DataTable>,
685        sql: &str,
686        temp_tables: Option<&TempTableRegistry>,
687    ) -> Result<DataView> {
688        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
689        Ok(view)
690    }
691
692    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
693    pub fn execute_statement(
694        &self,
695        table: Arc<DataTable>,
696        statement: SelectStatement,
697    ) -> Result<DataView> {
698        self.execute_statement_with_temp_tables(table, statement, None)
699    }
700
701    /// Execute a parsed SelectStatement with optional temp table access
702    pub fn execute_statement_with_temp_tables(
703        &self,
704        table: Arc<DataTable>,
705        statement: SelectStatement,
706        temp_tables: Option<&TempTableRegistry>,
707    ) -> Result<DataView> {
708        // First process CTEs to build context
709        let mut cte_context = HashMap::new();
710
711        // Add temp tables to CTE context if provided
712        if let Some(temp_registry) = temp_tables {
713            for table_name in temp_registry.list_tables() {
714                if let Some(temp_table) = temp_registry.get(&table_name) {
715                    debug!("Adding temp table {} to CTE context", table_name);
716                    let view = DataView::new(temp_table);
717                    cte_context.insert(table_name, Arc::new(view));
718                }
719            }
720        }
721
722        for cte in &statement.ctes {
723            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
724            // Execute the CTE based on its type
725            let cte_result = match &cte.cte_type {
726                CTEType::Standard(query) => {
727                    // Execute the CTE query (it might reference earlier CTEs)
728                    let view = self.build_view_with_context(
729                        table.clone(),
730                        query.clone(),
731                        &mut cte_context,
732                    )?;
733
734                    // Materialize the view and enrich columns with qualified names
735                    let mut materialized = self.materialize_view(view)?;
736
737                    // Enrich columns with qualified names for proper scoping
738                    for column in materialized.columns_mut() {
739                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
740                        column.source_table = Some(cte.name.clone());
741                    }
742
743                    DataView::new(Arc::new(materialized))
744                }
745                CTEType::Web(web_spec) => {
746                    // Fetch data from URL
747                    use crate::web::http_fetcher::WebDataFetcher;
748
749                    let fetcher = WebDataFetcher::new()?;
750                    // Pass None for query context (no full SQL available in these contexts)
751                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
752
753                    // Enrich columns with qualified names for proper scoping
754                    for column in data_table.columns_mut() {
755                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
756                        column.source_table = Some(cte.name.clone());
757                    }
758
759                    // Convert DataTable to DataView
760                    DataView::new(Arc::new(data_table))
761                }
762                CTEType::File(file_spec) => {
763                    let mut data_table =
764                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
765
766                    for column in data_table.columns_mut() {
767                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
768                        column.source_table = Some(cte.name.clone());
769                    }
770
771                    DataView::new(Arc::new(data_table))
772                }
773            };
774            // Store the result in the context for later use
775            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
776            debug!(
777                "QueryEngine: CTE '{}' pre-processed, stored in context",
778                cte.name
779            );
780        }
781
782        // Now process subqueries with CTE context available
783        let mut subquery_executor =
784            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
785        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
786
787        // Build the view with the same CTE context
788        self.build_view_with_context(table, processed_statement, &mut cte_context)
789    }
790
791    /// Execute a statement with provided CTE context (for subqueries)
792    pub fn execute_statement_with_cte_context(
793        &self,
794        table: Arc<DataTable>,
795        statement: SelectStatement,
796        cte_context: &HashMap<String, Arc<DataView>>,
797    ) -> Result<DataView> {
798        // Clone the context so we can add any CTEs from this statement
799        let mut local_context = cte_context.clone();
800
801        // Process any CTEs in this statement (they might be nested)
802        for cte in &statement.ctes {
803            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
804            let cte_result = match &cte.cte_type {
805                CTEType::Standard(query) => {
806                    let view = self.build_view_with_context(
807                        table.clone(),
808                        query.clone(),
809                        &mut local_context,
810                    )?;
811
812                    // Materialize the view and enrich columns with qualified names
813                    let mut materialized = self.materialize_view(view)?;
814
815                    // Enrich columns with qualified names for proper scoping
816                    for column in materialized.columns_mut() {
817                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
818                        column.source_table = Some(cte.name.clone());
819                    }
820
821                    DataView::new(Arc::new(materialized))
822                }
823                CTEType::Web(web_spec) => {
824                    // Fetch data from URL
825                    use crate::web::http_fetcher::WebDataFetcher;
826
827                    let fetcher = WebDataFetcher::new()?;
828                    // Pass None for query context (no full SQL available in these contexts)
829                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
830
831                    // Enrich columns with qualified names for proper scoping
832                    for column in data_table.columns_mut() {
833                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
834                        column.source_table = Some(cte.name.clone());
835                    }
836
837                    // Convert DataTable to DataView
838                    DataView::new(Arc::new(data_table))
839                }
840                CTEType::File(file_spec) => {
841                    let mut data_table =
842                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
843
844                    for column in data_table.columns_mut() {
845                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
846                        column.source_table = Some(cte.name.clone());
847                    }
848
849                    DataView::new(Arc::new(data_table))
850                }
851            };
852            local_context.insert(cte.name.clone(), Arc::new(cte_result));
853        }
854
855        // Process subqueries with the complete context
856        let mut subquery_executor =
857            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
858        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
859
860        // Build the view
861        self.build_view_with_context(table, processed_statement, &mut local_context)
862    }
863
864    /// Execute a query and return both the result and the execution plan
865    pub fn execute_with_plan(
866        &self,
867        table: Arc<DataTable>,
868        sql: &str,
869    ) -> Result<(DataView, ExecutionPlan)> {
870        self.execute_with_plan_and_temp_tables(table, sql, None)
871    }
872
873    /// Execute a query with temp tables and return both the result and the execution plan
874    pub fn execute_with_plan_and_temp_tables(
875        &self,
876        table: Arc<DataTable>,
877        sql: &str,
878        temp_tables: Option<&TempTableRegistry>,
879    ) -> Result<(DataView, ExecutionPlan)> {
880        let mut plan_builder = ExecutionPlanBuilder::new();
881        let start_time = Instant::now();
882
883        // Parse the SQL query
884        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
885        plan_builder.add_detail(format!("Query: {}", sql));
886        let mut parser = Parser::new(sql);
887        let statement = parser
888            .parse()
889            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
890        plan_builder.add_detail(format!("Parsed successfully"));
891        if let Some(ref from_source) = statement.from_source {
892            match from_source {
893                TableSource::Table(name) => {
894                    plan_builder.add_detail(format!("FROM: {}", name));
895                }
896                TableSource::DerivedTable { alias, .. } => {
897                    plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
898                }
899                TableSource::Pivot { .. } => {
900                    plan_builder.add_detail("FROM: PIVOT".to_string());
901                }
902            }
903        }
904        if statement.where_clause.is_some() {
905            plan_builder.add_detail("WHERE clause present".to_string());
906        }
907        plan_builder.end_step();
908
909        // First process CTEs to build context
910        let mut cte_context = HashMap::new();
911
912        // Add temp tables to CTE context if provided
913        if let Some(temp_registry) = temp_tables {
914            for table_name in temp_registry.list_tables() {
915                if let Some(temp_table) = temp_registry.get(&table_name) {
916                    debug!("Adding temp table {} to CTE context", table_name);
917                    let view = DataView::new(temp_table);
918                    cte_context.insert(table_name, Arc::new(view));
919                }
920            }
921        }
922
923        if !statement.ctes.is_empty() {
924            plan_builder.begin_step(
925                StepType::CTE,
926                format!("Process {} CTEs", statement.ctes.len()),
927            );
928
929            for cte in &statement.ctes {
930                let cte_start = Instant::now();
931                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
932
933                let cte_result = match &cte.cte_type {
934                    CTEType::Standard(query) => {
935                        // Add CTE query details
936                        if let Some(ref from_source) = query.from_source {
937                            match from_source {
938                                TableSource::Table(name) => {
939                                    plan_builder.add_detail(format!("Source: {}", name));
940                                }
941                                TableSource::DerivedTable { alias, .. } => {
942                                    plan_builder
943                                        .add_detail(format!("Source: derived table ({})", alias));
944                                }
945                                TableSource::Pivot { .. } => {
946                                    plan_builder.add_detail("Source: PIVOT".to_string());
947                                }
948                            }
949                        }
950                        if query.where_clause.is_some() {
951                            plan_builder.add_detail("Has WHERE clause".to_string());
952                        }
953                        if query.group_by.is_some() {
954                            plan_builder.add_detail("Has GROUP BY".to_string());
955                        }
956
957                        debug!(
958                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
959                            cte.name,
960                            cte_context.keys().collect::<Vec<_>>()
961                        );
962
963                        // Process subqueries in the CTE's query FIRST
964                        // This allows the subqueries to see all previously defined CTEs
965                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
966                            self.clone(),
967                            table.clone(),
968                            cte_context.clone(),
969                        );
970                        let processed_query = subquery_executor.execute_subqueries(query)?;
971
972                        let view = self.build_view_with_context(
973                            table.clone(),
974                            processed_query,
975                            &mut cte_context,
976                        )?;
977
978                        // Materialize the view and enrich columns with qualified names
979                        let mut materialized = self.materialize_view(view)?;
980
981                        // Enrich columns with qualified names for proper scoping
982                        for column in materialized.columns_mut() {
983                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
984                            column.source_table = Some(cte.name.clone());
985                        }
986
987                        DataView::new(Arc::new(materialized))
988                    }
989                    CTEType::Web(web_spec) => {
990                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
991                        if let Some(format) = &web_spec.format {
992                            plan_builder.add_detail(format!("Format: {:?}", format));
993                        }
994                        if let Some(cache) = web_spec.cache_seconds {
995                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
996                        }
997
998                        // Fetch data from URL
999                        use crate::web::http_fetcher::WebDataFetcher;
1000
1001                        let fetcher = WebDataFetcher::new()?;
1002                        // Pass None for query context - each WEB CTE is independent
1003                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
1004
1005                        // Enrich columns with qualified names for proper scoping
1006                        for column in data_table.columns_mut() {
1007                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1008                            column.source_table = Some(cte.name.clone());
1009                        }
1010
1011                        // Convert DataTable to DataView
1012                        DataView::new(Arc::new(data_table))
1013                    }
1014                    CTEType::File(file_spec) => {
1015                        plan_builder.add_detail(format!("PATH: {}", file_spec.path));
1016                        if file_spec.recursive {
1017                            plan_builder.add_detail("RECURSIVE".to_string());
1018                        }
1019                        if let Some(ref g) = file_spec.glob {
1020                            plan_builder.add_detail(format!("GLOB: {}", g));
1021                        }
1022                        if let Some(d) = file_spec.max_depth {
1023                            plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1024                        }
1025
1026                        let mut data_table =
1027                            crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1028
1029                        for column in data_table.columns_mut() {
1030                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1031                            column.source_table = Some(cte.name.clone());
1032                        }
1033
1034                        DataView::new(Arc::new(data_table))
1035                    }
1036                };
1037
1038                // Record CTE statistics
1039                plan_builder.set_rows_out(cte_result.row_count());
1040                plan_builder.add_detail(format!(
1041                    "Result: {} rows, {} columns",
1042                    cte_result.row_count(),
1043                    cte_result.column_count()
1044                ));
1045                plan_builder.add_detail(format!(
1046                    "Execution time: {:.3}ms",
1047                    cte_start.elapsed().as_secs_f64() * 1000.0
1048                ));
1049
1050                debug!(
1051                    "QueryEngine: Storing CTE '{}' in context with {} rows",
1052                    cte.name,
1053                    cte_result.row_count()
1054                );
1055                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1056                plan_builder.end_step();
1057            }
1058
1059            plan_builder.add_detail(format!(
1060                "All {} CTEs cached in context",
1061                statement.ctes.len()
1062            ));
1063            plan_builder.end_step();
1064        }
1065
1066        // Process subqueries in the statement with CTE context
1067        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1068        let mut subquery_executor =
1069            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1070
1071        // Check if there are subqueries to process
1072        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1073            // This is a simplified check - in reality we'd need to walk the AST
1074            format!("{:?}", w).contains("Subquery")
1075        });
1076
1077        if has_subqueries {
1078            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1079        }
1080
1081        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1082
1083        if has_subqueries {
1084            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1085        } else {
1086            plan_builder.add_detail("No subqueries to process".to_string());
1087        }
1088
1089        plan_builder.end_step();
1090        let result = self.build_view_with_context_and_plan(
1091            table,
1092            processed_statement,
1093            &mut cte_context,
1094            &mut plan_builder,
1095        )?;
1096
1097        let total_duration = start_time.elapsed();
1098        info!(
1099            "Query execution complete: total={:?}, rows={}",
1100            total_duration,
1101            result.row_count()
1102        );
1103
1104        let plan = plan_builder.build();
1105        Ok((result, plan))
1106    }
1107
1108    /// Build a `DataView` from a parsed SQL statement
1109    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1110        let mut cte_context = HashMap::new();
1111        self.build_view_with_context(table, statement, &mut cte_context)
1112    }
1113
1114    /// Build a DataView from a SelectStatement with CTE context
1115    fn build_view_with_context(
1116        &self,
1117        table: Arc<DataTable>,
1118        statement: SelectStatement,
1119        cte_context: &mut HashMap<String, Arc<DataView>>,
1120    ) -> Result<DataView> {
1121        let mut dummy_plan = ExecutionPlanBuilder::new();
1122        let mut exec_context = ExecutionContext::new();
1123        self.build_view_with_context_and_plan_and_exec(
1124            table,
1125            statement,
1126            cte_context,
1127            &mut dummy_plan,
1128            &mut exec_context,
1129        )
1130    }
1131
1132    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1133    fn build_view_with_context_and_plan(
1134        &self,
1135        table: Arc<DataTable>,
1136        statement: SelectStatement,
1137        cte_context: &mut HashMap<String, Arc<DataView>>,
1138        plan: &mut ExecutionPlanBuilder,
1139    ) -> Result<DataView> {
1140        let mut exec_context = ExecutionContext::new();
1141        self.build_view_with_context_and_plan_and_exec(
1142            table,
1143            statement,
1144            cte_context,
1145            plan,
1146            &mut exec_context,
1147        )
1148    }
1149
1150    /// Build a DataView with CTE context, execution plan, and alias resolution context
1151    fn build_view_with_context_and_plan_and_exec(
1152        &self,
1153        table: Arc<DataTable>,
1154        statement: SelectStatement,
1155        cte_context: &mut HashMap<String, Arc<DataView>>,
1156        plan: &mut ExecutionPlanBuilder,
1157        exec_context: &mut ExecutionContext,
1158    ) -> Result<DataView> {
1159        // First, process any CTEs that aren't already in the context
1160        for cte in &statement.ctes {
1161            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1162            if cte_context.contains_key(&cte.name) {
1163                debug!(
1164                    "QueryEngine: CTE '{}' already in context, skipping",
1165                    cte.name
1166                );
1167                continue;
1168            }
1169
1170            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1171            debug!(
1172                "QueryEngine: Available CTEs for '{}': {:?}",
1173                cte.name,
1174                cte_context.keys().collect::<Vec<_>>()
1175            );
1176
1177            // Execute the CTE query (it might reference earlier CTEs)
1178            let cte_result = match &cte.cte_type {
1179                CTEType::Standard(query) => {
1180                    let view =
1181                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1182
1183                    // Materialize the view and enrich columns with qualified names
1184                    let mut materialized = self.materialize_view(view)?;
1185
1186                    // Enrich columns with qualified names for proper scoping
1187                    for column in materialized.columns_mut() {
1188                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1189                        column.source_table = Some(cte.name.clone());
1190                    }
1191
1192                    DataView::new(Arc::new(materialized))
1193                }
1194                CTEType::Web(_web_spec) => {
1195                    // Web CTEs should have been processed earlier in execute_select
1196                    return Err(anyhow!(
1197                        "Web CTEs should be processed in execute_select method"
1198                    ));
1199                }
1200                CTEType::File(_file_spec) => {
1201                    // FILE CTEs (like WEB) should be processed earlier in execute_select
1202                    return Err(anyhow!(
1203                        "FILE CTEs should be processed in execute_select method"
1204                    ));
1205                }
1206            };
1207
1208            // Store the result in the context for later use
1209            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1210            debug!(
1211                "QueryEngine: CTE '{}' processed, stored in context",
1212                cte.name
1213            );
1214        }
1215
1216        // Determine the source table for the main query
1217        let source_table = if let Some(ref from_source) = statement.from_source {
1218            match from_source {
1219                TableSource::Table(table_name) => {
1220                    // Check if this references a CTE (case-insensitive fallback)
1221                    if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1222                        debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1223                        // Materialize the CTE view as a table
1224                        let mut materialized = self.materialize_view((**cte_view).clone())?;
1225
1226                        // Apply alias to qualified column names if present
1227                        #[allow(deprecated)]
1228                        if let Some(ref alias) = statement.from_alias {
1229                            debug!(
1230                                "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1231                                alias, table_name
1232                            );
1233                            for column in materialized.columns_mut() {
1234                                // Replace the CTE name with the alias in qualified names
1235                                if let Some(ref qualified_name) = column.qualified_name {
1236                                    if qualified_name.starts_with(&format!("{}.", table_name)) {
1237                                        column.qualified_name = Some(qualified_name.replace(
1238                                            &format!("{}.", table_name),
1239                                            &format!("{}.", alias),
1240                                        ));
1241                                    }
1242                                }
1243                                // Update source table to reflect the alias
1244                                if column.source_table.as_ref() == Some(table_name) {
1245                                    column.source_table = Some(alias.clone());
1246                                }
1247                            }
1248                        }
1249
1250                        Arc::new(materialized)
1251                    } else {
1252                        // Regular table reference - use the provided table
1253                        table.clone()
1254                    }
1255                }
1256                TableSource::DerivedTable { query, alias } => {
1257                    // Execute the subquery and use its result as the source
1258                    debug!(
1259                        "QueryEngine: Processing FROM derived table (alias: {})",
1260                        alias
1261                    );
1262                    let subquery_result =
1263                        self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1264
1265                    // Convert the DataView to a DataTable for use as source
1266                    // This materializes the subquery result
1267                    let mut materialized = self.materialize_view(subquery_result)?;
1268
1269                    // Apply the alias to all columns in the derived table
1270                    // Note: We set source_table but keep the column names unqualified
1271                    // so they can be referenced without the table prefix
1272                    for column in materialized.columns_mut() {
1273                        column.source_table = Some(alias.clone());
1274                    }
1275
1276                    Arc::new(materialized)
1277                }
1278                TableSource::Pivot { .. } => {
1279                    // PIVOT should have been expanded by PivotExpander transformer
1280                    return Err(anyhow!(
1281                        "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1282                    ));
1283                }
1284            }
1285        } else {
1286            // Fallback to deprecated fields for backward compatibility
1287            #[allow(deprecated)]
1288            if let Some(ref table_func) = statement.from_function {
1289                // Handle table functions like RANGE()
1290                debug!("QueryEngine: Processing table function (deprecated field)...");
1291                match table_func {
1292                    TableFunction::Generator { name, args } => {
1293                        // Use the generator registry to create the table
1294                        use crate::sql::generators::GeneratorRegistry;
1295
1296                        // Create generator registry (could be cached in QueryEngine)
1297                        let registry = GeneratorRegistry::new();
1298
1299                        if let Some(generator) = registry.get(name) {
1300                            // Evaluate arguments
1301                            let mut evaluator = ArithmeticEvaluator::with_date_notation(
1302                                &table,
1303                                self.date_notation.clone(),
1304                            );
1305                            let dummy_row = 0;
1306
1307                            let mut evaluated_args = Vec::new();
1308                            for arg in args {
1309                                evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1310                            }
1311
1312                            // Generate the table
1313                            generator.generate(evaluated_args)?
1314                        } else {
1315                            return Err(anyhow!("Unknown generator function: {}", name));
1316                        }
1317                    }
1318                }
1319            } else {
1320                #[allow(deprecated)]
1321                if let Some(ref subquery) = statement.from_subquery {
1322                    // Execute the subquery and use its result as the source
1323                    debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1324                    let subquery_result = self.build_view_with_context(
1325                        table.clone(),
1326                        *subquery.clone(),
1327                        cte_context,
1328                    )?;
1329
1330                    // Convert the DataView to a DataTable for use as source
1331                    // This materializes the subquery result
1332                    let materialized = self.materialize_view(subquery_result)?;
1333                    Arc::new(materialized)
1334                } else {
1335                    #[allow(deprecated)]
1336                    if let Some(ref table_name) = statement.from_table {
1337                        // Check if this references a CTE (case-insensitive fallback)
1338                        if let Some(cte_view) = resolve_cte(cte_context, table_name) {
1339                            debug!(
1340                                "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1341                                table_name
1342                            );
1343                            // Materialize the CTE view as a table
1344                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1345
1346                            // Apply alias to qualified column names if present
1347                            #[allow(deprecated)]
1348                            if let Some(ref alias) = statement.from_alias {
1349                                debug!(
1350                                    "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1351                                    alias, table_name
1352                                );
1353                                for column in materialized.columns_mut() {
1354                                    // Replace the CTE name with the alias in qualified names
1355                                    if let Some(ref qualified_name) = column.qualified_name {
1356                                        if qualified_name.starts_with(&format!("{}.", table_name)) {
1357                                            column.qualified_name = Some(qualified_name.replace(
1358                                                &format!("{}.", table_name),
1359                                                &format!("{}.", alias),
1360                                            ));
1361                                        }
1362                                    }
1363                                    // Update source table to reflect the alias
1364                                    if column.source_table.as_ref() == Some(table_name) {
1365                                        column.source_table = Some(alias.clone());
1366                                    }
1367                                }
1368                            }
1369
1370                            Arc::new(materialized)
1371                        } else {
1372                            // Regular table reference - use the provided table
1373                            table.clone()
1374                        }
1375                    } else {
1376                        // No FROM clause - use the provided table
1377                        table.clone()
1378                    }
1379                }
1380            }
1381        };
1382
1383        // Register alias in execution context if present
1384        #[allow(deprecated)]
1385        if let Some(ref alias) = statement.from_alias {
1386            #[allow(deprecated)]
1387            if let Some(ref table_name) = statement.from_table {
1388                exec_context.register_alias(alias.clone(), table_name.clone());
1389            }
1390        }
1391
1392        // Process JOINs if present
1393        let final_table = if !statement.joins.is_empty() {
1394            plan.begin_step(
1395                StepType::Join,
1396                format!("Process {} JOINs", statement.joins.len()),
1397            );
1398            plan.set_rows_in(source_table.row_count());
1399
1400            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1401            let mut current_table = source_table;
1402
1403            for (idx, join_clause) in statement.joins.iter().enumerate() {
1404                let join_start = Instant::now();
1405                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1406                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1407                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1408                plan.add_detail(format!(
1409                    "Executing {:?} JOIN on {} condition(s)",
1410                    join_clause.join_type,
1411                    join_clause.condition.conditions.len()
1412                ));
1413
1414                // Resolve the right table for the join
1415                let right_table = match &join_clause.table {
1416                    TableSource::Table(name) => {
1417                        // Check if it's a CTE reference (case-insensitive fallback)
1418                        if let Some(cte_view) = resolve_cte(cte_context, name) {
1419                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1420
1421                            // Apply alias to qualified column names if present
1422                            if let Some(ref alias) = join_clause.alias {
1423                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1424                                for column in materialized.columns_mut() {
1425                                    // Replace the CTE name with the alias in qualified names
1426                                    if let Some(ref qualified_name) = column.qualified_name {
1427                                        if qualified_name.starts_with(&format!("{}.", name)) {
1428                                            column.qualified_name = Some(qualified_name.replace(
1429                                                &format!("{}.", name),
1430                                                &format!("{}.", alias),
1431                                            ));
1432                                        }
1433                                    }
1434                                    // Update source table to reflect the alias
1435                                    if column.source_table.as_ref() == Some(name) {
1436                                        column.source_table = Some(alias.clone());
1437                                    }
1438                                }
1439                            }
1440
1441                            Arc::new(materialized)
1442                        } else {
1443                            // For now, we need the actual table data
1444                            // In a real implementation, this would load from file
1445                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1446                        }
1447                    }
1448                    TableSource::DerivedTable { query, alias: _ } => {
1449                        // Execute the subquery
1450                        let subquery_result = self.build_view_with_context(
1451                            table.clone(),
1452                            *query.clone(),
1453                            cte_context,
1454                        )?;
1455                        let materialized = self.materialize_view(subquery_result)?;
1456                        Arc::new(materialized)
1457                    }
1458                    TableSource::Pivot { .. } => {
1459                        // PIVOT in JOIN is not supported yet (will be handled by transformer)
1460                        return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1461                    }
1462                };
1463
1464                // Execute the join
1465                let joined = join_executor.execute_join(
1466                    current_table.clone(),
1467                    join_clause,
1468                    right_table.clone(),
1469                )?;
1470
1471                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1472                plan.set_rows_out(joined.row_count());
1473                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1474                plan.add_detail(format!(
1475                    "Join time: {:.3}ms",
1476                    join_start.elapsed().as_secs_f64() * 1000.0
1477                ));
1478                plan.end_step();
1479
1480                current_table = Arc::new(joined);
1481            }
1482
1483            plan.set_rows_out(current_table.row_count());
1484            plan.add_detail(format!(
1485                "Final result after all joins: {} rows",
1486                current_table.row_count()
1487            ));
1488            plan.end_step();
1489            current_table
1490        } else {
1491            source_table
1492        };
1493
1494        // Continue with the existing build_view logic but using final_table
1495        self.build_view_internal_with_plan_and_exec(
1496            final_table,
1497            statement,
1498            plan,
1499            Some(exec_context),
1500        )
1501    }
1502
1503    /// Materialize a DataView into a new DataTable
1504    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1505        let source = view.source();
1506        let mut result_table = DataTable::new("derived");
1507
1508        // Get the visible columns from the view
1509        let visible_cols = view.visible_column_indices().to_vec();
1510
1511        // Copy column definitions
1512        for col_idx in &visible_cols {
1513            let col = &source.columns[*col_idx];
1514            let new_col = DataColumn {
1515                name: col.name.clone(),
1516                data_type: col.data_type.clone(),
1517                nullable: col.nullable,
1518                unique_values: col.unique_values,
1519                null_count: col.null_count,
1520                metadata: col.metadata.clone(),
1521                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1522                source_table: col.source_table.clone(),     // Preserve source table
1523            };
1524            result_table.add_column(new_col);
1525        }
1526
1527        // Copy visible rows
1528        for row_idx in view.visible_row_indices() {
1529            let source_row = &source.rows[*row_idx];
1530            let mut new_row = DataRow { values: Vec::new() };
1531
1532            for col_idx in &visible_cols {
1533                new_row.values.push(source_row.values[*col_idx].clone());
1534            }
1535
1536            result_table.add_row(new_row);
1537        }
1538
1539        Ok(result_table)
1540    }
1541
1542    fn build_view_internal(
1543        &self,
1544        table: Arc<DataTable>,
1545        statement: SelectStatement,
1546    ) -> Result<DataView> {
1547        let mut dummy_plan = ExecutionPlanBuilder::new();
1548        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1549    }
1550
1551    fn build_view_internal_with_plan(
1552        &self,
1553        table: Arc<DataTable>,
1554        statement: SelectStatement,
1555        plan: &mut ExecutionPlanBuilder,
1556    ) -> Result<DataView> {
1557        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1558    }
1559
1560    fn build_view_internal_with_plan_and_exec(
1561        &self,
1562        table: Arc<DataTable>,
1563        statement: SelectStatement,
1564        plan: &mut ExecutionPlanBuilder,
1565        exec_context: Option<&ExecutionContext>,
1566    ) -> Result<DataView> {
1567        debug!(
1568            "QueryEngine::build_view - select_items: {:?}",
1569            statement.select_items
1570        );
1571        debug!(
1572            "QueryEngine::build_view - where_clause: {:?}",
1573            statement.where_clause
1574        );
1575
1576        // Start with all rows visible
1577        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1578
1579        // Apply WHERE clause filtering using recursive evaluator
1580        if let Some(where_clause) = &statement.where_clause {
1581            let total_rows = table.row_count();
1582            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1583            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1584
1585            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1586            plan.set_rows_in(total_rows);
1587            plan.add_detail(format!("Input: {} rows", total_rows));
1588
1589            // Add details about WHERE conditions
1590            for condition in &where_clause.conditions {
1591                plan.add_detail(format!("Condition: {:?}", condition.expr));
1592            }
1593
1594            let filter_start = Instant::now();
1595            // Create an evaluation context for caching compiled regexes
1596            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1597
1598            // Create evaluator ONCE before the loop for performance
1599            let mut evaluator = if let Some(exec_ctx) = exec_context {
1600                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1601                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1602            } else {
1603                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1604            };
1605
1606            // Filter visible rows based on WHERE clause
1607            let mut filtered_rows = Vec::new();
1608            for row_idx in visible_rows {
1609                // Only log for first few rows to avoid performance impact
1610                if row_idx < 3 {
1611                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1612                }
1613
1614                match evaluator.evaluate(where_clause, row_idx) {
1615                    Ok(result) => {
1616                        if row_idx < 3 {
1617                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1618                        }
1619                        if result {
1620                            filtered_rows.push(row_idx);
1621                        }
1622                    }
1623                    Err(e) => {
1624                        if row_idx < 3 {
1625                            debug!(
1626                                "QueryEngine: WHERE evaluation error for row {}: {}",
1627                                row_idx, e
1628                            );
1629                        }
1630                        // Propagate WHERE clause errors instead of silently ignoring them
1631                        return Err(e);
1632                    }
1633                }
1634            }
1635
1636            // Log regex cache statistics
1637            let (compilations, cache_hits) = eval_context.get_stats();
1638            if compilations > 0 || cache_hits > 0 {
1639                debug!(
1640                    "LIKE pattern cache: {} compilations, {} cache hits",
1641                    compilations, cache_hits
1642                );
1643            }
1644            visible_rows = filtered_rows;
1645            let filter_duration = filter_start.elapsed();
1646            info!(
1647                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1648                total_rows,
1649                visible_rows.len(),
1650                filter_duration
1651            );
1652
1653            plan.set_rows_out(visible_rows.len());
1654            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1655            plan.add_detail(format!(
1656                "Filter time: {:.3}ms",
1657                filter_duration.as_secs_f64() * 1000.0
1658            ));
1659            plan.end_step();
1660        }
1661
1662        // Create initial DataView with filtered rows
1663        let mut view = DataView::new(table.clone());
1664        view = view.with_rows(visible_rows);
1665
1666        // Handle GROUP BY if present
1667        if let Some(group_by_exprs) = &statement.group_by {
1668            if !group_by_exprs.is_empty() {
1669                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1670
1671                plan.begin_step(
1672                    StepType::GroupBy,
1673                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1674                );
1675                plan.set_rows_in(view.row_count());
1676                plan.add_detail(format!("Input: {} rows", view.row_count()));
1677                for expr in group_by_exprs {
1678                    plan.add_detail(format!("Group by: {:?}", expr));
1679                }
1680
1681                let group_start = Instant::now();
1682                view = self.apply_group_by(
1683                    view,
1684                    group_by_exprs,
1685                    &statement.select_items,
1686                    statement.having.as_ref(),
1687                    plan,
1688                )?;
1689
1690                // Hide any columns that were promoted from HAVING (synthetic aggregates)
1691                // These have the __hidden_agg_ prefix and should not appear in output
1692                use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1693                let hidden_indices: Vec<usize> = view
1694                    .source()
1695                    .columns
1696                    .iter()
1697                    .enumerate()
1698                    .filter_map(|(i, c)| {
1699                        if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1700                            Some(i)
1701                        } else {
1702                            None
1703                        }
1704                    })
1705                    .collect();
1706                for &idx in hidden_indices.iter().rev() {
1707                    view.hide_column(idx);
1708                }
1709
1710                plan.set_rows_out(view.row_count());
1711                plan.add_detail(format!("Output: {} groups", view.row_count()));
1712                plan.add_detail(format!(
1713                    "Overall time: {:.3}ms",
1714                    group_start.elapsed().as_secs_f64() * 1000.0
1715                ));
1716                plan.end_step();
1717            }
1718        } else {
1719            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1720            if !statement.select_items.is_empty() {
1721                // Check if we have ANY non-star items (not just the first one)
1722                let has_non_star_items = statement
1723                    .select_items
1724                    .iter()
1725                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1726
1727                // Apply select items if:
1728                // 1. We have computed expressions or explicit columns
1729                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1730                if has_non_star_items || statement.select_items.len() > 1 {
1731                    view = self.apply_select_items(
1732                        view,
1733                        &statement.select_items,
1734                        &statement,
1735                        exec_context,
1736                        plan,
1737                    )?;
1738                }
1739                // If it's just a single star, no projection needed
1740            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1741                debug!("QueryEngine: Using legacy columns path");
1742                // Fallback to legacy column projection for backward compatibility
1743                // Use the current view's source table, not the original table
1744                let source_table = view.source();
1745                let column_indices =
1746                    self.resolve_column_indices(source_table, &statement.columns)?;
1747                view = view.with_columns(column_indices);
1748            }
1749        }
1750
1751        // Apply DISTINCT if specified
1752        if statement.distinct {
1753            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1754            plan.set_rows_in(view.row_count());
1755            plan.add_detail(format!("Input: {} rows", view.row_count()));
1756
1757            let distinct_start = Instant::now();
1758            view = self.apply_distinct(view)?;
1759
1760            plan.set_rows_out(view.row_count());
1761            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1762            plan.add_detail(format!(
1763                "Distinct time: {:.3}ms",
1764                distinct_start.elapsed().as_secs_f64() * 1000.0
1765            ));
1766            plan.end_step();
1767        }
1768
1769        // Apply ORDER BY sorting
1770        if let Some(order_by_columns) = &statement.order_by {
1771            if !order_by_columns.is_empty() {
1772                plan.begin_step(
1773                    StepType::Sort,
1774                    format!("ORDER BY {} columns", order_by_columns.len()),
1775                );
1776                plan.set_rows_in(view.row_count());
1777                for col in order_by_columns {
1778                    // Format the expression (simplified for now - just show column name or "expr")
1779                    let expr_str = match &col.expr {
1780                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1781                        _ => "expr".to_string(),
1782                    };
1783                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1784                }
1785
1786                let sort_start = Instant::now();
1787                view =
1788                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1789
1790                plan.add_detail(format!(
1791                    "Sort time: {:.3}ms",
1792                    sort_start.elapsed().as_secs_f64() * 1000.0
1793                ));
1794                plan.end_step();
1795            }
1796        }
1797
1798        // Strip columns promoted by OrderByAliasTransformer for ORDER BY visibility.
1799        // Unlike the HIDDEN_AGG_PREFIX strip (which runs right after GROUP BY)
1800        // this MUST run after ORDER BY — the whole point of the promotion is
1801        // that the column has to survive projection long enough to be sorted on.
1802        {
1803            use crate::query_plan::order_by_alias_transformer::HIDDEN_ORDERBY_PREFIX;
1804            let hidden_indices: Vec<usize> = view
1805                .source()
1806                .columns
1807                .iter()
1808                .enumerate()
1809                .filter_map(|(i, c)| {
1810                    if c.name.starts_with(HIDDEN_ORDERBY_PREFIX) {
1811                        Some(i)
1812                    } else {
1813                        None
1814                    }
1815                })
1816                .collect();
1817            for &idx in hidden_indices.iter().rev() {
1818                view.hide_column(idx);
1819            }
1820        }
1821
1822        // Apply LIMIT/OFFSET
1823        if let Some(limit) = statement.limit {
1824            let offset = statement.offset.unwrap_or(0);
1825            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1826            plan.set_rows_in(view.row_count());
1827            if offset > 0 {
1828                plan.add_detail(format!("OFFSET: {}", offset));
1829            }
1830            view = view.with_limit(limit, offset);
1831            plan.set_rows_out(view.row_count());
1832            plan.add_detail(format!("Output: {} rows", view.row_count()));
1833            plan.end_step();
1834        }
1835
1836        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1837        if !statement.set_operations.is_empty() {
1838            plan.begin_step(
1839                StepType::SetOperation,
1840                format!("Process {} set operations", statement.set_operations.len()),
1841            );
1842            plan.set_rows_in(view.row_count());
1843
1844            // Materialize the first result set
1845            let mut combined_table = self.materialize_view(view)?;
1846            let first_columns = combined_table.column_names();
1847            let first_column_count = first_columns.len();
1848
1849            // Track if any operation requires deduplication
1850            let mut needs_deduplication = false;
1851
1852            // Process each set operation
1853            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1854                let op_start = Instant::now();
1855                plan.begin_step(
1856                    StepType::SetOperation,
1857                    format!("{:?} operation #{}", operation, idx + 1),
1858                );
1859
1860                // Execute the next SELECT statement
1861                // We need to pass the original table and exec_context for proper resolution
1862                let next_view = if let Some(exec_ctx) = exec_context {
1863                    self.build_view_internal_with_plan_and_exec(
1864                        table.clone(),
1865                        *next_statement.clone(),
1866                        plan,
1867                        Some(exec_ctx),
1868                    )?
1869                } else {
1870                    self.build_view_internal_with_plan(
1871                        table.clone(),
1872                        *next_statement.clone(),
1873                        plan,
1874                    )?
1875                };
1876
1877                // Materialize the next result set
1878                let next_table = self.materialize_view(next_view)?;
1879                let next_columns = next_table.column_names();
1880                let next_column_count = next_columns.len();
1881
1882                // Validate schema compatibility
1883                if first_column_count != next_column_count {
1884                    return Err(anyhow!(
1885                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1886                        first_column_count,
1887                        idx + 2,
1888                        next_column_count
1889                    ));
1890                }
1891
1892                // Warn if column names don't match (but allow it - some SQL dialects do)
1893                for (col_idx, (first_col, next_col)) in
1894                    first_columns.iter().zip(next_columns.iter()).enumerate()
1895                {
1896                    if !first_col.eq_ignore_ascii_case(next_col) {
1897                        debug!(
1898                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1899                            col_idx + 1,
1900                            first_col,
1901                            next_col
1902                        );
1903                    }
1904                }
1905
1906                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1907                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1908
1909                // Perform the set operation
1910                match operation {
1911                    SetOperation::UnionAll => {
1912                        // UNION ALL: Simply concatenate all rows without deduplication
1913                        for row in next_table.rows.iter() {
1914                            combined_table.add_row(row.clone());
1915                        }
1916                        plan.add_detail(format!(
1917                            "Result: {} rows (no deduplication)",
1918                            combined_table.row_count()
1919                        ));
1920                    }
1921                    SetOperation::Union => {
1922                        // UNION: Concatenate all rows first, deduplicate at the end
1923                        for row in next_table.rows.iter() {
1924                            combined_table.add_row(row.clone());
1925                        }
1926                        needs_deduplication = true;
1927                        plan.add_detail(format!(
1928                            "Combined: {} rows (deduplication pending)",
1929                            combined_table.row_count()
1930                        ));
1931                    }
1932                    SetOperation::Intersect => {
1933                        // INTERSECT: Keep only rows that appear in both
1934                        // TODO: Implement intersection logic
1935                        return Err(anyhow!("INTERSECT is not yet implemented"));
1936                    }
1937                    SetOperation::Except => {
1938                        // EXCEPT: Keep only rows from left that don't appear in right
1939                        // TODO: Implement except logic
1940                        return Err(anyhow!("EXCEPT is not yet implemented"));
1941                    }
1942                }
1943
1944                plan.add_detail(format!(
1945                    "Operation time: {:.3}ms",
1946                    op_start.elapsed().as_secs_f64() * 1000.0
1947                ));
1948                plan.set_rows_out(combined_table.row_count());
1949                plan.end_step();
1950            }
1951
1952            plan.set_rows_out(combined_table.row_count());
1953            plan.add_detail(format!(
1954                "Combined result: {} rows after {} operations",
1955                combined_table.row_count(),
1956                statement.set_operations.len()
1957            ));
1958            plan.end_step();
1959
1960            // Create a new view from the combined table
1961            view = DataView::new(Arc::new(combined_table));
1962
1963            // Apply deduplication if any UNION (not UNION ALL) operation was used
1964            if needs_deduplication {
1965                plan.begin_step(
1966                    StepType::Distinct,
1967                    "UNION deduplication - remove duplicate rows".to_string(),
1968                );
1969                plan.set_rows_in(view.row_count());
1970                plan.add_detail(format!("Input: {} rows", view.row_count()));
1971
1972                let distinct_start = Instant::now();
1973                view = self.apply_distinct(view)?;
1974
1975                plan.set_rows_out(view.row_count());
1976                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1977                plan.add_detail(format!(
1978                    "Deduplication time: {:.3}ms",
1979                    distinct_start.elapsed().as_secs_f64() * 1000.0
1980                ));
1981                plan.end_step();
1982            }
1983        }
1984
1985        Ok(view)
1986    }
1987
1988    /// Resolve column names to indices
1989    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1990        let mut indices = Vec::new();
1991        let table_columns = table.column_names();
1992
1993        for col_name in columns {
1994            let index = table_columns
1995                .iter()
1996                .position(|c| c.eq_ignore_ascii_case(col_name))
1997                .ok_or_else(|| {
1998                    let suggestion = self.find_similar_column(table, col_name);
1999                    match suggestion {
2000                        Some(similar) => anyhow::anyhow!(
2001                            "Column '{}' not found. Did you mean '{}'?",
2002                            col_name,
2003                            similar
2004                        ),
2005                        None => anyhow::anyhow!("Column '{}' not found", col_name),
2006                    }
2007                })?;
2008            indices.push(index);
2009        }
2010
2011        Ok(indices)
2012    }
2013
2014    /// Apply SELECT items (columns and computed expressions) to create new view
2015    fn apply_select_items(
2016        &self,
2017        view: DataView,
2018        select_items: &[SelectItem],
2019        _statement: &SelectStatement,
2020        exec_context: Option<&ExecutionContext>,
2021        plan: &mut ExecutionPlanBuilder,
2022    ) -> Result<DataView> {
2023        debug!(
2024            "QueryEngine::apply_select_items - items: {:?}",
2025            select_items
2026        );
2027        debug!(
2028            "QueryEngine::apply_select_items - input view has {} rows",
2029            view.row_count()
2030        );
2031
2032        // Check if any select items contain window functions
2033        let has_window_functions = select_items.iter().any(|item| match item {
2034            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2035            _ => false,
2036        });
2037
2038        // Count window functions for detailed reporting
2039        let window_func_count: usize = select_items
2040            .iter()
2041            .filter(|item| match item {
2042                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2043                _ => false,
2044            })
2045            .count();
2046
2047        // Start timing for window function evaluation if present
2048        let window_start = if has_window_functions {
2049            debug!(
2050                "QueryEngine::apply_select_items - detected {} window functions",
2051                window_func_count
2052            );
2053
2054            // Extract window specs (Step 2: parallel path, not used yet)
2055            let window_specs = Self::extract_window_specs(select_items);
2056            debug!("Extracted {} window function specs", window_specs.len());
2057
2058            Some(Instant::now())
2059        } else {
2060            None
2061        };
2062
2063        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
2064        let has_unnest = select_items.iter().any(|item| match item {
2065            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2066            _ => false,
2067        });
2068
2069        if has_unnest {
2070            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2071            return self.apply_select_with_row_expansion(view, select_items);
2072        }
2073
2074        // Check if this is an aggregate query:
2075        // 1. At least one aggregate function exists
2076        // 2. All other items are either aggregates or constants (aggregate-compatible)
2077        let has_aggregates = select_items.iter().any(|item| match item {
2078            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2079            SelectItem::Column { .. } => false,
2080            SelectItem::Star { .. } => false,
2081            SelectItem::StarExclude { .. } => false,
2082        });
2083
2084        let all_aggregate_compatible = select_items.iter().all(|item| match item {
2085            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2086            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
2087            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
2088            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
2089        });
2090
2091        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2092            // Special handling for aggregate queries with constants (no GROUP BY)
2093            // These should produce exactly one row
2094            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2095            return self.apply_aggregate_select(view, select_items);
2096        }
2097
2098        // Check if we need to create computed columns
2099        let has_computed_expressions = select_items
2100            .iter()
2101            .any(|item| matches!(item, SelectItem::Expression { .. }));
2102
2103        debug!(
2104            "QueryEngine::apply_select_items - has_computed_expressions: {}",
2105            has_computed_expressions
2106        );
2107
2108        if !has_computed_expressions {
2109            // Simple case: only columns, use existing projection logic
2110            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2111            return Ok(view.with_columns(column_indices));
2112        }
2113
2114        // Complex case: we have computed expressions
2115        // IMPORTANT: We create a PROJECTED view, not a new table
2116        // This preserves the original DataTable reference
2117
2118        let source_table = view.source();
2119        let visible_rows = view.visible_row_indices();
2120
2121        // Create a temporary table just for the computed result view
2122        // But this table is only used for the current query result
2123        let mut computed_table = DataTable::new("query_result");
2124
2125        // First, expand any Star selectors to actual columns
2126        let mut expanded_items = Vec::new();
2127        for item in select_items {
2128            match item {
2129                SelectItem::Star { table_prefix, .. } => {
2130                    if let Some(prefix) = table_prefix {
2131                        // Scoped expansion: table.* expands only columns from that table
2132                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2133                        for col in &source_table.columns {
2134                            if Self::column_matches_table(col, prefix) {
2135                                expanded_items.push(SelectItem::Column {
2136                                    column: ColumnRef::unquoted(col.name.clone()),
2137                                    leading_comments: vec![],
2138                                    trailing_comment: None,
2139                                });
2140                            }
2141                        }
2142                    } else {
2143                        // Unscoped expansion: * expands to all columns
2144                        debug!("QueryEngine::apply_select_items - expanding *");
2145                        for col_name in source_table.column_names() {
2146                            expanded_items.push(SelectItem::Column {
2147                                column: ColumnRef::unquoted(col_name.to_string()),
2148                                leading_comments: vec![],
2149                                trailing_comment: None,
2150                            });
2151                        }
2152                    }
2153                }
2154                _ => expanded_items.push(item.clone()),
2155            }
2156        }
2157
2158        // Add columns based on expanded SelectItems, handling duplicates
2159        let mut column_name_counts: std::collections::HashMap<String, usize> =
2160            std::collections::HashMap::new();
2161
2162        for item in &expanded_items {
2163            let base_name = match item {
2164                SelectItem::Column {
2165                    column: col_ref, ..
2166                } => col_ref.name.clone(),
2167                SelectItem::Expression { alias, .. } => alias.clone(),
2168                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2169                SelectItem::StarExclude { .. } => {
2170                    unreachable!("StarExclude should have been expanded")
2171                }
2172            };
2173
2174            // Check if this column name has been used before
2175            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2176            let column_name = if *count == 0 {
2177                // First occurrence, use the name as-is
2178                base_name.clone()
2179            } else {
2180                // Duplicate, append a suffix
2181                format!("{base_name}_{count}")
2182            };
2183            *count += 1;
2184
2185            computed_table.add_column(DataColumn::new(&column_name));
2186        }
2187
2188        // Check if batch evaluation can be used
2189        // Batch evaluation is the default but we need to check if all window functions
2190        // are standalone (not embedded in expressions)
2191        let can_use_batch = expanded_items.iter().all(|item| {
2192            match item {
2193                SelectItem::Expression { expr, .. } => {
2194                    // Only use batch evaluation if the expression IS a window function,
2195                    // not if it CONTAINS a window function
2196                    matches!(expr, SqlExpression::WindowFunction { .. })
2197                        || !Self::contains_window_function(expr)
2198                }
2199                _ => true, // Non-expressions are fine
2200            }
2201        });
2202
2203        // Batch evaluation is now the default mode for improved performance
2204        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
2205        let use_batch_evaluation = can_use_batch
2206            && std::env::var("SQL_CLI_BATCH_WINDOW")
2207                .map(|v| v != "0" && v.to_lowercase() != "false")
2208                .unwrap_or(true);
2209
2210        // Store window specs for batch evaluation if needed
2211        let batch_window_specs = if use_batch_evaluation && has_window_functions {
2212            debug!("BATCH window function evaluation flag is enabled");
2213            // Extract window specs before timing starts
2214            let specs = Self::extract_window_specs(&expanded_items);
2215            debug!(
2216                "Extracted {} window function specs for batch evaluation",
2217                specs.len()
2218            );
2219            Some(specs)
2220        } else {
2221            None
2222        };
2223
2224        // Calculate values for each row
2225        let mut evaluator =
2226            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2227
2228        // Populate table aliases from exec_context if available
2229        if let Some(exec_ctx) = exec_context {
2230            let aliases = exec_ctx.get_aliases();
2231            if !aliases.is_empty() {
2232                debug!(
2233                    "Applying {} aliases to evaluator: {:?}",
2234                    aliases.len(),
2235                    aliases
2236                );
2237                evaluator = evaluator.with_table_aliases(aliases);
2238            }
2239        }
2240
2241        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2242        // This avoids 50,000+ redundant context lookups
2243        if has_window_functions {
2244            let preload_start = Instant::now();
2245
2246            // Extract all unique WindowSpecs from SELECT items
2247            let mut window_specs = Vec::new();
2248            for item in &expanded_items {
2249                if let SelectItem::Expression { expr, .. } = item {
2250                    Self::collect_window_specs(expr, &mut window_specs);
2251                }
2252            }
2253
2254            // Pre-create all WindowContexts
2255            for spec in &window_specs {
2256                let _ = evaluator.get_or_create_window_context(spec);
2257            }
2258
2259            debug!(
2260                "Pre-created {} WindowContext(s) in {:.2}ms",
2261                window_specs.len(),
2262                preload_start.elapsed().as_secs_f64() * 1000.0
2263            );
2264        }
2265
2266        // Batch evaluation path for window functions
2267        if let Some(window_specs) = batch_window_specs {
2268            debug!("Starting batch window function evaluation");
2269            let batch_start = Instant::now();
2270
2271            // Initialize result table with all rows
2272            let mut batch_results: Vec<Vec<DataValue>> =
2273                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2274
2275            // Use the window specs we extracted earlier
2276            let detailed_window_specs = &window_specs;
2277
2278            // Group window specs by their WindowSpec for batch processing
2279            let mut specs_by_window: HashMap<
2280                u64,
2281                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2282            > = HashMap::new();
2283            for spec in detailed_window_specs {
2284                let hash = spec.spec.compute_hash();
2285                specs_by_window
2286                    .entry(hash)
2287                    .or_insert_with(Vec::new)
2288                    .push(spec);
2289            }
2290
2291            // Process each unique window specification
2292            for (_window_hash, specs) in specs_by_window {
2293                // Get the window context (already pre-created)
2294                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2295
2296                // Process each function using this window
2297                for spec in specs {
2298                    match spec.function_name.as_str() {
2299                        "LAG" => {
2300                            // Extract column and offset from arguments
2301                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2302                                let column_name = col_ref.name.as_str();
2303                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2304                                    spec.args.get(1)
2305                                {
2306                                    n.parse::<i64>().unwrap_or(1)
2307                                } else {
2308                                    1 // default offset
2309                                };
2310
2311                                let values = context.evaluate_lag_batch(
2312                                    visible_rows,
2313                                    column_name,
2314                                    offset,
2315                                )?;
2316
2317                                // Write results to the output column
2318                                for (row_idx, value) in values.into_iter().enumerate() {
2319                                    batch_results[row_idx][spec.output_column_index] = value;
2320                                }
2321                            }
2322                        }
2323                        "LEAD" => {
2324                            // Extract column and offset from arguments
2325                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2326                                let column_name = col_ref.name.as_str();
2327                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2328                                    spec.args.get(1)
2329                                {
2330                                    n.parse::<i64>().unwrap_or(1)
2331                                } else {
2332                                    1 // default offset
2333                                };
2334
2335                                let values = context.evaluate_lead_batch(
2336                                    visible_rows,
2337                                    column_name,
2338                                    offset,
2339                                )?;
2340
2341                                // Write results to the output column
2342                                for (row_idx, value) in values.into_iter().enumerate() {
2343                                    batch_results[row_idx][spec.output_column_index] = value;
2344                                }
2345                            }
2346                        }
2347                        "ROW_NUMBER" => {
2348                            let values = context.evaluate_row_number_batch(visible_rows)?;
2349
2350                            // Write results to the output column
2351                            for (row_idx, value) in values.into_iter().enumerate() {
2352                                batch_results[row_idx][spec.output_column_index] = value;
2353                            }
2354                        }
2355                        "RANK" => {
2356                            let values = context.evaluate_rank_batch(visible_rows)?;
2357
2358                            // Write results to the output column
2359                            for (row_idx, value) in values.into_iter().enumerate() {
2360                                batch_results[row_idx][spec.output_column_index] = value;
2361                            }
2362                        }
2363                        "DENSE_RANK" => {
2364                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2365
2366                            // Write results to the output column
2367                            for (row_idx, value) in values.into_iter().enumerate() {
2368                                batch_results[row_idx][spec.output_column_index] = value;
2369                            }
2370                        }
2371                        "SUM" => {
2372                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2373                                let column_name = col_ref.name.as_str();
2374                                let values =
2375                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2376
2377                                for (row_idx, value) in values.into_iter().enumerate() {
2378                                    batch_results[row_idx][spec.output_column_index] = value;
2379                                }
2380                            }
2381                        }
2382                        "AVG" => {
2383                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2384                                let column_name = col_ref.name.as_str();
2385                                let values =
2386                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2387
2388                                for (row_idx, value) in values.into_iter().enumerate() {
2389                                    batch_results[row_idx][spec.output_column_index] = value;
2390                                }
2391                            }
2392                        }
2393                        "MIN" => {
2394                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2395                                let column_name = col_ref.name.as_str();
2396                                let values =
2397                                    context.evaluate_min_batch(visible_rows, column_name)?;
2398
2399                                for (row_idx, value) in values.into_iter().enumerate() {
2400                                    batch_results[row_idx][spec.output_column_index] = value;
2401                                }
2402                            }
2403                        }
2404                        "MAX" => {
2405                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2406                                let column_name = col_ref.name.as_str();
2407                                let values =
2408                                    context.evaluate_max_batch(visible_rows, column_name)?;
2409
2410                                for (row_idx, value) in values.into_iter().enumerate() {
2411                                    batch_results[row_idx][spec.output_column_index] = value;
2412                                }
2413                            }
2414                        }
2415                        "COUNT" => {
2416                            // COUNT can be COUNT(*) or COUNT(column)
2417                            let column_name = match spec.args.get(0) {
2418                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2419                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2420                                _ => None,
2421                            };
2422
2423                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2424
2425                            for (row_idx, value) in values.into_iter().enumerate() {
2426                                batch_results[row_idx][spec.output_column_index] = value;
2427                            }
2428                        }
2429                        "FIRST_VALUE" => {
2430                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2431                                let column_name = col_ref.name.as_str();
2432                                let values = context
2433                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2434
2435                                for (row_idx, value) in values.into_iter().enumerate() {
2436                                    batch_results[row_idx][spec.output_column_index] = value;
2437                                }
2438                            }
2439                        }
2440                        "LAST_VALUE" => {
2441                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2442                                let column_name = col_ref.name.as_str();
2443                                let values =
2444                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2445
2446                                for (row_idx, value) in values.into_iter().enumerate() {
2447                                    batch_results[row_idx][spec.output_column_index] = value;
2448                                }
2449                            }
2450                        }
2451                        _ => {
2452                            // Fall back to per-row evaluation for unsupported functions
2453                            debug!(
2454                                "Window function {} not supported in batch mode, using per-row",
2455                                spec.function_name
2456                            );
2457                        }
2458                    }
2459                }
2460            }
2461
2462            // Now evaluate non-window columns
2463            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2464                for (col_idx, item) in expanded_items.iter().enumerate() {
2465                    // Skip if this column was already filled by a window function
2466                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2467                        continue;
2468                    }
2469
2470                    let value = match item {
2471                        SelectItem::Column {
2472                            column: col_ref, ..
2473                        } => {
2474                            match evaluator
2475                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2476                            {
2477                                Ok(val) => val,
2478                                Err(e) => {
2479                                    return Err(anyhow!(
2480                                        "Failed to evaluate column {}: {}",
2481                                        col_ref.to_sql(),
2482                                        e
2483                                    ));
2484                                }
2485                            }
2486                        }
2487                        SelectItem::Expression { expr, .. } => {
2488                            // For batch evaluation, we need to handle expressions differently
2489                            // If this is just a window function by itself, we already computed it
2490                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2491                                // Pure window function - already handled in batch evaluation
2492                                continue;
2493                            }
2494                            // For expressions containing window functions or regular expressions,
2495                            // evaluate them normally
2496                            evaluator.evaluate(&expr, source_row_idx)?
2497                        }
2498                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2499                        SelectItem::StarExclude { .. } => {
2500                            unreachable!("StarExclude should have been expanded")
2501                        }
2502                    };
2503                    batch_results[result_row_idx][col_idx] = value;
2504                }
2505            }
2506
2507            // Add all rows to the table
2508            for row_values in batch_results {
2509                computed_table
2510                    .add_row(DataRow::new(row_values))
2511                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2512            }
2513
2514            debug!(
2515                "Batch window evaluation completed in {:.3}ms",
2516                batch_start.elapsed().as_secs_f64() * 1000.0
2517            );
2518        } else {
2519            // Original per-row evaluation path
2520            for &row_idx in visible_rows {
2521                let mut row_values = Vec::new();
2522
2523                for item in &expanded_items {
2524                    let value = match item {
2525                        SelectItem::Column {
2526                            column: col_ref, ..
2527                        } => {
2528                            // Use evaluator for column resolution (handles aliases properly)
2529                            match evaluator
2530                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2531                            {
2532                                Ok(val) => val,
2533                                Err(e) => {
2534                                    return Err(anyhow!(
2535                                        "Failed to evaluate column {}: {}",
2536                                        col_ref.to_sql(),
2537                                        e
2538                                    ));
2539                                }
2540                            }
2541                        }
2542                        SelectItem::Expression { expr, .. } => {
2543                            // Computed expression
2544                            evaluator.evaluate(&expr, row_idx)?
2545                        }
2546                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2547                        SelectItem::StarExclude { .. } => {
2548                            unreachable!("StarExclude should have been expanded")
2549                        }
2550                    };
2551                    row_values.push(value);
2552                }
2553
2554                computed_table
2555                    .add_row(DataRow::new(row_values))
2556                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2557            }
2558        }
2559
2560        // Log window function timing if applicable
2561        if let Some(start) = window_start {
2562            let window_duration = start.elapsed();
2563            info!(
2564                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2565                window_duration.as_secs_f64() * 1000.0,
2566                visible_rows.len(),
2567                window_func_count
2568            );
2569
2570            // Add to execution plan
2571            plan.begin_step(
2572                StepType::WindowFunction,
2573                format!("Evaluate {} window function(s)", window_func_count),
2574            );
2575            plan.set_rows_in(visible_rows.len());
2576            plan.set_rows_out(visible_rows.len());
2577            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2578            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2579            plan.add_detail(format!(
2580                "Evaluation time: {:.3}ms",
2581                window_duration.as_secs_f64() * 1000.0
2582            ));
2583            plan.end_step();
2584        }
2585
2586        // Return a view of the computed result
2587        // This is a temporary view for this query only
2588        Ok(DataView::new(Arc::new(computed_table)))
2589    }
2590
2591    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2592    fn apply_select_with_row_expansion(
2593        &self,
2594        view: DataView,
2595        select_items: &[SelectItem],
2596    ) -> Result<DataView> {
2597        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2598
2599        let source_table = view.source();
2600        let visible_rows = view.visible_row_indices();
2601        let expander_registry = RowExpanderRegistry::new();
2602
2603        // Create result table
2604        let mut result_table = DataTable::new("unnest_result");
2605
2606        // Expand * to columns and set up result columns
2607        let mut expanded_items = Vec::new();
2608        for item in select_items {
2609            match item {
2610                SelectItem::Star { table_prefix, .. } => {
2611                    if let Some(prefix) = table_prefix {
2612                        // Scoped expansion: table.* expands only columns from that table
2613                        debug!(
2614                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2615                            prefix
2616                        );
2617                        for col in &source_table.columns {
2618                            if Self::column_matches_table(col, prefix) {
2619                                expanded_items.push(SelectItem::Column {
2620                                    column: ColumnRef::unquoted(col.name.clone()),
2621                                    leading_comments: vec![],
2622                                    trailing_comment: None,
2623                                });
2624                            }
2625                        }
2626                    } else {
2627                        // Unscoped expansion: * expands to all columns
2628                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2629                        for col_name in source_table.column_names() {
2630                            expanded_items.push(SelectItem::Column {
2631                                column: ColumnRef::unquoted(col_name.to_string()),
2632                                leading_comments: vec![],
2633                                trailing_comment: None,
2634                            });
2635                        }
2636                    }
2637                }
2638                _ => expanded_items.push(item.clone()),
2639            }
2640        }
2641
2642        // Add columns to result table
2643        for item in &expanded_items {
2644            let column_name = match item {
2645                SelectItem::Column {
2646                    column: col_ref, ..
2647                } => col_ref.name.clone(),
2648                SelectItem::Expression { alias, .. } => alias.clone(),
2649                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2650                SelectItem::StarExclude { .. } => {
2651                    unreachable!("StarExclude should have been expanded")
2652                }
2653            };
2654            result_table.add_column(DataColumn::new(&column_name));
2655        }
2656
2657        // Process each input row
2658        let mut evaluator =
2659            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2660
2661        for &row_idx in visible_rows {
2662            // First pass: identify UNNEST expressions and collect their expansion arrays
2663            let mut unnest_expansions = Vec::new();
2664            let mut unnest_indices = Vec::new();
2665
2666            for (col_idx, item) in expanded_items.iter().enumerate() {
2667                if let SelectItem::Expression { expr, .. } = item {
2668                    if let Some(expansion_result) = self.try_expand_unnest(
2669                        &expr,
2670                        source_table,
2671                        row_idx,
2672                        &mut evaluator,
2673                        &expander_registry,
2674                    )? {
2675                        unnest_expansions.push(expansion_result);
2676                        unnest_indices.push(col_idx);
2677                    }
2678                }
2679            }
2680
2681            // Determine how many output rows to generate
2682            let expansion_count = if unnest_expansions.is_empty() {
2683                1 // No UNNEST, just one row
2684            } else {
2685                unnest_expansions
2686                    .iter()
2687                    .map(|exp| exp.row_count())
2688                    .max()
2689                    .unwrap_or(1)
2690            };
2691
2692            // Generate output rows
2693            for output_idx in 0..expansion_count {
2694                let mut row_values = Vec::new();
2695
2696                for (col_idx, item) in expanded_items.iter().enumerate() {
2697                    // Check if this column is an UNNEST column
2698                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2699
2700                    let value = if let Some(unnest_idx) = unnest_position {
2701                        // Get value from expansion array (or NULL if exhausted)
2702                        let expansion = &unnest_expansions[unnest_idx];
2703                        expansion
2704                            .values
2705                            .get(output_idx)
2706                            .cloned()
2707                            .unwrap_or(DataValue::Null)
2708                    } else {
2709                        // Regular column or non-UNNEST expression - replicate from input
2710                        match item {
2711                            SelectItem::Column {
2712                                column: col_ref, ..
2713                            } => {
2714                                let col_idx =
2715                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2716                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2717                                    )?;
2718                                let row = source_table
2719                                    .get_row(row_idx)
2720                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2721                                row.get(col_idx)
2722                                    .ok_or_else(|| {
2723                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2724                                    })?
2725                                    .clone()
2726                            }
2727                            SelectItem::Expression { expr, .. } => {
2728                                // Non-UNNEST expression - evaluate once and replicate
2729                                evaluator.evaluate(&expr, row_idx)?
2730                            }
2731                            SelectItem::Star { .. } => unreachable!(),
2732                            SelectItem::StarExclude { .. } => {
2733                                unreachable!("StarExclude should have been expanded")
2734                            }
2735                        }
2736                    };
2737
2738                    row_values.push(value);
2739                }
2740
2741                result_table
2742                    .add_row(DataRow::new(row_values))
2743                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2744            }
2745        }
2746
2747        debug!(
2748            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2749            visible_rows.len(),
2750            result_table.row_count()
2751        );
2752
2753        Ok(DataView::new(Arc::new(result_table)))
2754    }
2755
2756    /// Try to expand an expression if it's an UNNEST call
2757    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2758    fn try_expand_unnest(
2759        &self,
2760        expr: &SqlExpression,
2761        _source_table: &DataTable,
2762        row_idx: usize,
2763        evaluator: &mut ArithmeticEvaluator,
2764        expander_registry: &RowExpanderRegistry,
2765    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2766        // Check for UNNEST variant (direct syntax)
2767        if let SqlExpression::Unnest { column, delimiter } = expr {
2768            // Evaluate the column expression
2769            let column_value = evaluator.evaluate(column, row_idx)?;
2770
2771            // Delimiter is already a string literal
2772            let delimiter_value = DataValue::String(delimiter.clone());
2773
2774            // Get the UNNEST expander
2775            let expander = expander_registry
2776                .get("UNNEST")
2777                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2778
2779            // Expand the value
2780            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2781            return Ok(Some(expansion));
2782        }
2783
2784        // Also check for FunctionCall form (for compatibility)
2785        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2786            if name.to_uppercase() == "UNNEST" {
2787                // UNNEST(column, delimiter)
2788                if args.len() != 2 {
2789                    return Err(anyhow::anyhow!(
2790                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2791                    ));
2792                }
2793
2794                // Evaluate the column expression (first arg)
2795                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2796
2797                // Evaluate the delimiter expression (second arg)
2798                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2799
2800                // Get the UNNEST expander
2801                let expander = expander_registry
2802                    .get("UNNEST")
2803                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2804
2805                // Expand the value
2806                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2807                return Ok(Some(expansion));
2808            }
2809        }
2810
2811        Ok(None)
2812    }
2813
2814    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2815    fn apply_aggregate_select(
2816        &self,
2817        view: DataView,
2818        select_items: &[SelectItem],
2819    ) -> Result<DataView> {
2820        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2821
2822        let source_table = view.source();
2823        let mut result_table = DataTable::new("aggregate_result");
2824
2825        // Add columns for each select item
2826        for item in select_items {
2827            let column_name = match item {
2828                SelectItem::Expression { alias, .. } => alias.clone(),
2829                _ => unreachable!("Should only have expressions in aggregate-only query"),
2830            };
2831            result_table.add_column(DataColumn::new(&column_name));
2832        }
2833
2834        // Create evaluator with visible rows from the view (for filtered aggregates)
2835        let visible_rows = view.visible_row_indices().to_vec();
2836        let mut evaluator =
2837            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2838                .with_visible_rows(visible_rows);
2839
2840        // Evaluate each aggregate expression once (they handle all rows internally)
2841        let mut row_values = Vec::new();
2842        for item in select_items {
2843            match item {
2844                SelectItem::Expression { expr, .. } => {
2845                    // The evaluator will handle aggregates over all rows
2846                    // We pass row_index=0 but aggregates ignore it and process all rows
2847                    let value = evaluator.evaluate(expr, 0)?;
2848                    row_values.push(value);
2849                }
2850                _ => unreachable!("Should only have expressions in aggregate-only query"),
2851            }
2852        }
2853
2854        // Add the single result row
2855        result_table
2856            .add_row(DataRow::new(row_values))
2857            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2858
2859        Ok(DataView::new(Arc::new(result_table)))
2860    }
2861
2862    /// Check if a column belongs to a specific table based on source_table or qualified_name
2863    ///
2864    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2865    /// to filter which columns should be included.
2866    ///
2867    /// # Arguments
2868    /// * `col` - The column to check
2869    /// * `table_name` - The table name or alias to match against
2870    ///
2871    /// # Returns
2872    /// `true` if the column belongs to the specified table
2873    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2874        // First, check the source_table field
2875        if let Some(ref source) = col.source_table {
2876            // Direct match or matches with schema qualification
2877            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2878                return true;
2879            }
2880        }
2881
2882        // Second, check the qualified_name field
2883        if let Some(ref qualified) = col.qualified_name {
2884            // Check if qualified name starts with "table_name."
2885            if qualified.starts_with(&format!("{}.", table_name)) {
2886                return true;
2887            }
2888        }
2889
2890        false
2891    }
2892
2893    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2894    fn resolve_select_columns(
2895        &self,
2896        table: &DataTable,
2897        select_items: &[SelectItem],
2898    ) -> Result<Vec<usize>> {
2899        let mut indices = Vec::new();
2900        let table_columns = table.column_names();
2901
2902        for item in select_items {
2903            match item {
2904                SelectItem::Column {
2905                    column: col_ref, ..
2906                } => {
2907                    // Check if this has a table prefix
2908                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2909                        // For qualified references, ONLY try qualified lookup - no fallback
2910                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2911                        table.find_column_by_qualified_name(&qualified_name)
2912                            .ok_or_else(|| {
2913                                // Check if any columns have qualified names for better error message
2914                                let has_qualified = table.columns.iter()
2915                                    .any(|c| c.qualified_name.is_some());
2916                                if !has_qualified {
2917                                    anyhow::anyhow!(
2918                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2919                                        qualified_name, table_prefix
2920                                    )
2921                                } else {
2922                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2923                                }
2924                            })?
2925                    } else {
2926                        // Simple column name lookup
2927                        table_columns
2928                            .iter()
2929                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2930                            .ok_or_else(|| {
2931                                let suggestion = self.find_similar_column(table, &col_ref.name);
2932                                match suggestion {
2933                                    Some(similar) => anyhow::anyhow!(
2934                                        "Column '{}' not found. Did you mean '{}'?",
2935                                        col_ref.name,
2936                                        similar
2937                                    ),
2938                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2939                                }
2940                            })?
2941                    };
2942                    indices.push(index);
2943                }
2944                SelectItem::Star { table_prefix, .. } => {
2945                    if let Some(prefix) = table_prefix {
2946                        // Scoped expansion: table.* expands only columns from that table
2947                        for (i, col) in table.columns.iter().enumerate() {
2948                            if Self::column_matches_table(col, prefix) {
2949                                indices.push(i);
2950                            }
2951                        }
2952                    } else {
2953                        // Unscoped expansion: * expands to all column indices
2954                        for i in 0..table_columns.len() {
2955                            indices.push(i);
2956                        }
2957                    }
2958                }
2959                SelectItem::StarExclude {
2960                    table_prefix,
2961                    excluded_columns,
2962                    ..
2963                } => {
2964                    // Expand all columns (with optional table prefix), then exclude specified ones
2965                    if let Some(prefix) = table_prefix {
2966                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2967                        for (i, col) in table.columns.iter().enumerate() {
2968                            if Self::column_matches_table(col, prefix)
2969                                && !excluded_columns.contains(&col.name)
2970                            {
2971                                indices.push(i);
2972                            }
2973                        }
2974                    } else {
2975                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2976                        for (i, col_name) in table_columns.iter().enumerate() {
2977                            if !excluded_columns
2978                                .iter()
2979                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2980                            {
2981                                indices.push(i);
2982                            }
2983                        }
2984                    }
2985                }
2986                SelectItem::Expression { .. } => {
2987                    return Err(anyhow::anyhow!(
2988                        "Computed expressions require new table creation"
2989                    ));
2990                }
2991            }
2992        }
2993
2994        Ok(indices)
2995    }
2996
2997    /// Apply DISTINCT to remove duplicate rows
2998    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2999        use std::collections::HashSet;
3000
3001        let source = view.source();
3002        let visible_cols = view.visible_column_indices();
3003        let visible_rows = view.visible_row_indices();
3004
3005        // Build a set to track unique rows
3006        let mut seen_rows = HashSet::new();
3007        let mut unique_row_indices = Vec::new();
3008
3009        for &row_idx in visible_rows {
3010            // Build a key representing this row's visible column values
3011            let mut row_key = Vec::new();
3012            for &col_idx in visible_cols {
3013                let value = source
3014                    .get_value(row_idx, col_idx)
3015                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
3016                // Convert value to a hashable representation
3017                row_key.push(format!("{:?}", value));
3018            }
3019
3020            // Check if we've seen this row before
3021            if seen_rows.insert(row_key) {
3022                // First time seeing this row combination
3023                unique_row_indices.push(row_idx);
3024            }
3025        }
3026
3027        // Create a new view with only unique rows
3028        Ok(view.with_rows(unique_row_indices))
3029    }
3030
3031    /// Apply multi-column ORDER BY sorting to the view
3032    fn apply_multi_order_by(
3033        &self,
3034        view: DataView,
3035        order_by_columns: &[OrderByItem],
3036    ) -> Result<DataView> {
3037        self.apply_multi_order_by_with_context(view, order_by_columns, None)
3038    }
3039
3040    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
3041    fn apply_multi_order_by_with_context(
3042        &self,
3043        mut view: DataView,
3044        order_by_columns: &[OrderByItem],
3045        _exec_context: Option<&ExecutionContext>,
3046    ) -> Result<DataView> {
3047        // Build list of (source_column_index, ascending) tuples
3048        let mut sort_columns = Vec::new();
3049
3050        for order_col in order_by_columns {
3051            // Extract column name from expression (currently only supports simple columns)
3052            let column_name = match &order_col.expr {
3053                SqlExpression::Column(col_ref) => col_ref.name.clone(),
3054                _ => {
3055                    // TODO: Support expression evaluation in ORDER BY
3056                    return Err(anyhow!(
3057                        "ORDER BY expressions not yet supported - only simple columns allowed"
3058                    ));
3059                }
3060            };
3061
3062            // Try to find the column index, handling qualified column names (table.column)
3063            let col_index = if column_name.contains('.') {
3064                // Qualified column name - extract unqualified part
3065                if let Some(dot_pos) = column_name.rfind('.') {
3066                    let col_name = &column_name[dot_pos + 1..];
3067
3068                    // After SELECT processing, columns are unqualified
3069                    // So just use the column name part
3070                    debug!(
3071                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
3072                        col_name, column_name
3073                    );
3074                    view.source().get_column_index(col_name)
3075                } else {
3076                    view.source().get_column_index(&column_name)
3077                }
3078            } else {
3079                // Simple column name
3080                view.source().get_column_index(&column_name)
3081            }
3082            .ok_or_else(|| {
3083                // If not found, provide helpful error with suggestions
3084                let suggestion = self.find_similar_column(view.source(), &column_name);
3085                match suggestion {
3086                    Some(similar) => anyhow::anyhow!(
3087                        "Column '{}' not found. Did you mean '{}'?",
3088                        column_name,
3089                        similar
3090                    ),
3091                    None => {
3092                        // Also list available columns for debugging
3093                        let available_cols = view.source().column_names().join(", ");
3094                        anyhow::anyhow!(
3095                            "Column '{}' not found. Available columns: {}",
3096                            column_name,
3097                            available_cols
3098                        )
3099                    }
3100                }
3101            })?;
3102
3103            let ascending = matches!(order_col.direction, SortDirection::Asc);
3104            sort_columns.push((col_index, ascending));
3105        }
3106
3107        // Apply multi-column sorting
3108        view.apply_multi_sort(&sort_columns)?;
3109        Ok(view)
3110    }
3111
3112    /// Apply GROUP BY to the view with optional HAVING clause
3113    fn apply_group_by(
3114        &self,
3115        view: DataView,
3116        group_by_exprs: &[SqlExpression],
3117        select_items: &[SelectItem],
3118        having: Option<&SqlExpression>,
3119        plan: &mut ExecutionPlanBuilder,
3120    ) -> Result<DataView> {
3121        // Use the new expression-based GROUP BY implementation
3122        let (result_view, phase_info) = self.apply_group_by_expressions(
3123            view,
3124            group_by_exprs,
3125            select_items,
3126            having,
3127            self.case_insensitive,
3128            self.date_notation.clone(),
3129        )?;
3130
3131        // Add detailed phase information to the execution plan
3132        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3133        plan.add_detail(format!(
3134            "Phase 1 - Group Building: {:.3}ms",
3135            phase_info.phase2_key_building.as_secs_f64() * 1000.0
3136        ));
3137        plan.add_detail(format!(
3138            "  • Processing {} rows into {} groups",
3139            phase_info.total_rows, phase_info.num_groups
3140        ));
3141        plan.add_detail(format!(
3142            "Phase 2 - Aggregation: {:.3}ms",
3143            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3144        ));
3145        if phase_info.phase4_having_evaluation > Duration::ZERO {
3146            plan.add_detail(format!(
3147                "Phase 3 - HAVING Filter: {:.3}ms",
3148                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3149            ));
3150            plan.add_detail(format!(
3151                "  • Filtered {} groups",
3152                phase_info.groups_filtered_by_having
3153            ));
3154        }
3155        plan.add_detail(format!(
3156            "Total GROUP BY time: {:.3}ms",
3157            phase_info.total_time.as_secs_f64() * 1000.0
3158        ));
3159
3160        Ok(result_view)
3161    }
3162
3163    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
3164    /// This helps pre-size hash tables for better performance
3165    pub fn estimate_group_cardinality(
3166        &self,
3167        view: &DataView,
3168        group_by_exprs: &[SqlExpression],
3169    ) -> usize {
3170        // If we have few rows, just return the row count as upper bound
3171        let row_count = view.get_visible_rows().len();
3172        if row_count <= 100 {
3173            return row_count;
3174        }
3175
3176        // Sample first 1000 rows or 10% of data, whichever is smaller
3177        let sample_size = min(1000, row_count / 10).max(100);
3178        let mut seen = FxHashSet::default();
3179
3180        let visible_rows = view.get_visible_rows();
3181        for (i, &row_idx) in visible_rows.iter().enumerate() {
3182            if i >= sample_size {
3183                break;
3184            }
3185
3186            // Evaluate GROUP BY expressions for this row
3187            let mut key_values = Vec::new();
3188            for expr in group_by_exprs {
3189                let mut evaluator = ArithmeticEvaluator::new(view.source());
3190                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3191                key_values.push(value);
3192            }
3193
3194            seen.insert(key_values);
3195        }
3196
3197        // Estimate total cardinality based on sample
3198        let sample_cardinality = seen.len();
3199        let estimated = (sample_cardinality * row_count) / sample_size;
3200
3201        // Cap at row count and ensure minimum of sample cardinality
3202        estimated.min(row_count).max(sample_cardinality)
3203    }
3204}
3205
3206#[cfg(test)]
3207mod tests {
3208    use super::*;
3209    use crate::data::datatable::{DataColumn, DataRow, DataValue};
3210
3211    fn create_test_table() -> Arc<DataTable> {
3212        let mut table = DataTable::new("test");
3213
3214        // Add columns
3215        table.add_column(DataColumn::new("id"));
3216        table.add_column(DataColumn::new("name"));
3217        table.add_column(DataColumn::new("age"));
3218
3219        // Add rows
3220        table
3221            .add_row(DataRow::new(vec![
3222                DataValue::Integer(1),
3223                DataValue::String("Alice".to_string()),
3224                DataValue::Integer(30),
3225            ]))
3226            .unwrap();
3227
3228        table
3229            .add_row(DataRow::new(vec![
3230                DataValue::Integer(2),
3231                DataValue::String("Bob".to_string()),
3232                DataValue::Integer(25),
3233            ]))
3234            .unwrap();
3235
3236        table
3237            .add_row(DataRow::new(vec![
3238                DataValue::Integer(3),
3239                DataValue::String("Charlie".to_string()),
3240                DataValue::Integer(35),
3241            ]))
3242            .unwrap();
3243
3244        Arc::new(table)
3245    }
3246
3247    #[test]
3248    fn test_select_all() {
3249        let table = create_test_table();
3250        let engine = QueryEngine::new();
3251
3252        let view = engine
3253            .execute(table.clone(), "SELECT * FROM users")
3254            .unwrap();
3255        assert_eq!(view.row_count(), 3);
3256        assert_eq!(view.column_count(), 3);
3257    }
3258
3259    #[test]
3260    fn test_select_columns() {
3261        let table = create_test_table();
3262        let engine = QueryEngine::new();
3263
3264        let view = engine
3265            .execute(table.clone(), "SELECT name, age FROM users")
3266            .unwrap();
3267        assert_eq!(view.row_count(), 3);
3268        assert_eq!(view.column_count(), 2);
3269    }
3270
3271    #[test]
3272    fn test_select_with_limit() {
3273        let table = create_test_table();
3274        let engine = QueryEngine::new();
3275
3276        let view = engine
3277            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3278            .unwrap();
3279        assert_eq!(view.row_count(), 2);
3280    }
3281
3282    #[test]
3283    fn test_type_coercion_contains() {
3284        // Initialize tracing for debug output
3285        let _ = tracing_subscriber::fmt()
3286            .with_max_level(tracing::Level::DEBUG)
3287            .try_init();
3288
3289        let mut table = DataTable::new("test");
3290        table.add_column(DataColumn::new("id"));
3291        table.add_column(DataColumn::new("status"));
3292        table.add_column(DataColumn::new("price"));
3293
3294        // Add test data with mixed types
3295        table
3296            .add_row(DataRow::new(vec![
3297                DataValue::Integer(1),
3298                DataValue::String("Pending".to_string()),
3299                DataValue::Float(99.99),
3300            ]))
3301            .unwrap();
3302
3303        table
3304            .add_row(DataRow::new(vec![
3305                DataValue::Integer(2),
3306                DataValue::String("Confirmed".to_string()),
3307                DataValue::Float(150.50),
3308            ]))
3309            .unwrap();
3310
3311        table
3312            .add_row(DataRow::new(vec![
3313                DataValue::Integer(3),
3314                DataValue::String("Pending".to_string()),
3315                DataValue::Float(75.00),
3316            ]))
3317            .unwrap();
3318
3319        let table = Arc::new(table);
3320        let engine = QueryEngine::new();
3321
3322        println!("\n=== Testing WHERE clause with Contains ===");
3323        println!("Table has {} rows", table.row_count());
3324        for i in 0..table.row_count() {
3325            let status = table.get_value(i, 1);
3326            println!("Row {i}: status = {status:?}");
3327        }
3328
3329        // Test 1: Basic string contains (should work)
3330        println!("\n--- Test 1: status.Contains('pend') ---");
3331        let result = engine.execute(
3332            table.clone(),
3333            "SELECT * FROM test WHERE status.Contains('pend')",
3334        );
3335        match result {
3336            Ok(view) => {
3337                println!("SUCCESS: Found {} matching rows", view.row_count());
3338                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3339            }
3340            Err(e) => {
3341                panic!("Query failed: {e}");
3342            }
3343        }
3344
3345        // Test 2: Numeric contains (should work with type coercion)
3346        println!("\n--- Test 2: price.Contains('9') ---");
3347        let result = engine.execute(
3348            table.clone(),
3349            "SELECT * FROM test WHERE price.Contains('9')",
3350        );
3351        match result {
3352            Ok(view) => {
3353                println!(
3354                    "SUCCESS: Found {} matching rows with price containing '9'",
3355                    view.row_count()
3356                );
3357                // Should find 99.99 row
3358                assert!(view.row_count() >= 1);
3359            }
3360            Err(e) => {
3361                panic!("Numeric coercion query failed: {e}");
3362            }
3363        }
3364
3365        println!("\n=== All tests passed! ===");
3366    }
3367
3368    #[test]
3369    fn test_not_in_clause() {
3370        // Initialize tracing for debug output
3371        let _ = tracing_subscriber::fmt()
3372            .with_max_level(tracing::Level::DEBUG)
3373            .try_init();
3374
3375        let mut table = DataTable::new("test");
3376        table.add_column(DataColumn::new("id"));
3377        table.add_column(DataColumn::new("country"));
3378
3379        // Add test data
3380        table
3381            .add_row(DataRow::new(vec![
3382                DataValue::Integer(1),
3383                DataValue::String("CA".to_string()),
3384            ]))
3385            .unwrap();
3386
3387        table
3388            .add_row(DataRow::new(vec![
3389                DataValue::Integer(2),
3390                DataValue::String("US".to_string()),
3391            ]))
3392            .unwrap();
3393
3394        table
3395            .add_row(DataRow::new(vec![
3396                DataValue::Integer(3),
3397                DataValue::String("UK".to_string()),
3398            ]))
3399            .unwrap();
3400
3401        let table = Arc::new(table);
3402        let engine = QueryEngine::new();
3403
3404        println!("\n=== Testing NOT IN clause ===");
3405        println!("Table has {} rows", table.row_count());
3406        for i in 0..table.row_count() {
3407            let country = table.get_value(i, 1);
3408            println!("Row {i}: country = {country:?}");
3409        }
3410
3411        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3412        println!("\n--- Test: country NOT IN ('CA') ---");
3413        let result = engine.execute(
3414            table.clone(),
3415            "SELECT * FROM test WHERE country NOT IN ('CA')",
3416        );
3417        match result {
3418            Ok(view) => {
3419                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3420                assert_eq!(view.row_count(), 2); // Should find US and UK
3421            }
3422            Err(e) => {
3423                panic!("NOT IN query failed: {e}");
3424            }
3425        }
3426
3427        println!("\n=== NOT IN test complete! ===");
3428    }
3429
3430    #[test]
3431    fn test_case_insensitive_in_and_not_in() {
3432        // Initialize tracing for debug output
3433        let _ = tracing_subscriber::fmt()
3434            .with_max_level(tracing::Level::DEBUG)
3435            .try_init();
3436
3437        let mut table = DataTable::new("test");
3438        table.add_column(DataColumn::new("id"));
3439        table.add_column(DataColumn::new("country"));
3440
3441        // Add test data with mixed case
3442        table
3443            .add_row(DataRow::new(vec![
3444                DataValue::Integer(1),
3445                DataValue::String("CA".to_string()), // uppercase
3446            ]))
3447            .unwrap();
3448
3449        table
3450            .add_row(DataRow::new(vec![
3451                DataValue::Integer(2),
3452                DataValue::String("us".to_string()), // lowercase
3453            ]))
3454            .unwrap();
3455
3456        table
3457            .add_row(DataRow::new(vec![
3458                DataValue::Integer(3),
3459                DataValue::String("UK".to_string()), // uppercase
3460            ]))
3461            .unwrap();
3462
3463        let table = Arc::new(table);
3464
3465        println!("\n=== Testing Case-Insensitive IN clause ===");
3466        println!("Table has {} rows", table.row_count());
3467        for i in 0..table.row_count() {
3468            let country = table.get_value(i, 1);
3469            println!("Row {i}: country = {country:?}");
3470        }
3471
3472        // Test case-insensitive IN - should match 'CA' with 'ca'
3473        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3474        let engine = QueryEngine::with_case_insensitive(true);
3475        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3476        match result {
3477            Ok(view) => {
3478                println!(
3479                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3480                    view.row_count()
3481                );
3482                assert_eq!(view.row_count(), 1); // Should find CA row
3483            }
3484            Err(e) => {
3485                panic!("Case-insensitive IN query failed: {e}");
3486            }
3487        }
3488
3489        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3490        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3491        let result = engine.execute(
3492            table.clone(),
3493            "SELECT * FROM test WHERE country NOT IN ('ca')",
3494        );
3495        match result {
3496            Ok(view) => {
3497                println!(
3498                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3499                    view.row_count()
3500                );
3501                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3502            }
3503            Err(e) => {
3504                panic!("Case-insensitive NOT IN query failed: {e}");
3505            }
3506        }
3507
3508        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3509        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3510        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3511        let result = engine_case_sensitive
3512            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3513        match result {
3514            Ok(view) => {
3515                println!(
3516                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3517                    view.row_count()
3518                );
3519                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3520            }
3521            Err(e) => {
3522                panic!("Case-sensitive IN query failed: {e}");
3523            }
3524        }
3525
3526        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3527    }
3528
3529    #[test]
3530    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3531    fn test_parentheses_in_where_clause() {
3532        // Initialize tracing for debug output
3533        let _ = tracing_subscriber::fmt()
3534            .with_max_level(tracing::Level::DEBUG)
3535            .try_init();
3536
3537        let mut table = DataTable::new("test");
3538        table.add_column(DataColumn::new("id"));
3539        table.add_column(DataColumn::new("status"));
3540        table.add_column(DataColumn::new("priority"));
3541
3542        // Add test data
3543        table
3544            .add_row(DataRow::new(vec![
3545                DataValue::Integer(1),
3546                DataValue::String("Pending".to_string()),
3547                DataValue::String("High".to_string()),
3548            ]))
3549            .unwrap();
3550
3551        table
3552            .add_row(DataRow::new(vec![
3553                DataValue::Integer(2),
3554                DataValue::String("Complete".to_string()),
3555                DataValue::String("High".to_string()),
3556            ]))
3557            .unwrap();
3558
3559        table
3560            .add_row(DataRow::new(vec![
3561                DataValue::Integer(3),
3562                DataValue::String("Pending".to_string()),
3563                DataValue::String("Low".to_string()),
3564            ]))
3565            .unwrap();
3566
3567        table
3568            .add_row(DataRow::new(vec![
3569                DataValue::Integer(4),
3570                DataValue::String("Complete".to_string()),
3571                DataValue::String("Low".to_string()),
3572            ]))
3573            .unwrap();
3574
3575        let table = Arc::new(table);
3576        let engine = QueryEngine::new();
3577
3578        println!("\n=== Testing Parentheses in WHERE clause ===");
3579        println!("Table has {} rows", table.row_count());
3580        for i in 0..table.row_count() {
3581            let status = table.get_value(i, 1);
3582            let priority = table.get_value(i, 2);
3583            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3584        }
3585
3586        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3587        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3588        let result = engine.execute(
3589            table.clone(),
3590            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3591        );
3592        match result {
3593            Ok(view) => {
3594                println!(
3595                    "SUCCESS: Found {} rows with parenthetical logic",
3596                    view.row_count()
3597                );
3598                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3599            }
3600            Err(e) => {
3601                panic!("Parentheses query failed: {e}");
3602            }
3603        }
3604
3605        println!("\n=== Parentheses test complete! ===");
3606    }
3607
3608    #[test]
3609    #[ignore = "Numeric type coercion needs fixing"]
3610    fn test_numeric_type_coercion() {
3611        // Initialize tracing for debug output
3612        let _ = tracing_subscriber::fmt()
3613            .with_max_level(tracing::Level::DEBUG)
3614            .try_init();
3615
3616        let mut table = DataTable::new("test");
3617        table.add_column(DataColumn::new("id"));
3618        table.add_column(DataColumn::new("price"));
3619        table.add_column(DataColumn::new("quantity"));
3620
3621        // Add test data with different numeric types
3622        table
3623            .add_row(DataRow::new(vec![
3624                DataValue::Integer(1),
3625                DataValue::Float(99.50), // Contains '.'
3626                DataValue::Integer(100),
3627            ]))
3628            .unwrap();
3629
3630        table
3631            .add_row(DataRow::new(vec![
3632                DataValue::Integer(2),
3633                DataValue::Float(150.0), // Contains '.' and '0'
3634                DataValue::Integer(200),
3635            ]))
3636            .unwrap();
3637
3638        table
3639            .add_row(DataRow::new(vec![
3640                DataValue::Integer(3),
3641                DataValue::Integer(75), // No decimal point
3642                DataValue::Integer(50),
3643            ]))
3644            .unwrap();
3645
3646        let table = Arc::new(table);
3647        let engine = QueryEngine::new();
3648
3649        println!("\n=== Testing Numeric Type Coercion ===");
3650        println!("Table has {} rows", table.row_count());
3651        for i in 0..table.row_count() {
3652            let price = table.get_value(i, 1);
3653            let quantity = table.get_value(i, 2);
3654            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3655        }
3656
3657        // Test Contains on float values - should find rows with decimal points
3658        println!("\n--- Test: price.Contains('.') ---");
3659        let result = engine.execute(
3660            table.clone(),
3661            "SELECT * FROM test WHERE price.Contains('.')",
3662        );
3663        match result {
3664            Ok(view) => {
3665                println!(
3666                    "SUCCESS: Found {} rows with decimal points in price",
3667                    view.row_count()
3668                );
3669                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3670            }
3671            Err(e) => {
3672                panic!("Numeric Contains query failed: {e}");
3673            }
3674        }
3675
3676        // Test Contains on integer values converted to string
3677        println!("\n--- Test: quantity.Contains('0') ---");
3678        let result = engine.execute(
3679            table.clone(),
3680            "SELECT * FROM test WHERE quantity.Contains('0')",
3681        );
3682        match result {
3683            Ok(view) => {
3684                println!(
3685                    "SUCCESS: Found {} rows with '0' in quantity",
3686                    view.row_count()
3687                );
3688                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3689            }
3690            Err(e) => {
3691                panic!("Integer Contains query failed: {e}");
3692            }
3693        }
3694
3695        println!("\n=== Numeric type coercion test complete! ===");
3696    }
3697
3698    #[test]
3699    fn test_datetime_comparisons() {
3700        // Initialize tracing for debug output
3701        let _ = tracing_subscriber::fmt()
3702            .with_max_level(tracing::Level::DEBUG)
3703            .try_init();
3704
3705        let mut table = DataTable::new("test");
3706        table.add_column(DataColumn::new("id"));
3707        table.add_column(DataColumn::new("created_date"));
3708
3709        // Add test data with date strings (as they would come from CSV)
3710        table
3711            .add_row(DataRow::new(vec![
3712                DataValue::Integer(1),
3713                DataValue::String("2024-12-15".to_string()),
3714            ]))
3715            .unwrap();
3716
3717        table
3718            .add_row(DataRow::new(vec![
3719                DataValue::Integer(2),
3720                DataValue::String("2025-01-15".to_string()),
3721            ]))
3722            .unwrap();
3723
3724        table
3725            .add_row(DataRow::new(vec![
3726                DataValue::Integer(3),
3727                DataValue::String("2025-02-15".to_string()),
3728            ]))
3729            .unwrap();
3730
3731        let table = Arc::new(table);
3732        let engine = QueryEngine::new();
3733
3734        println!("\n=== Testing DateTime Comparisons ===");
3735        println!("Table has {} rows", table.row_count());
3736        for i in 0..table.row_count() {
3737            let date = table.get_value(i, 1);
3738            println!("Row {i}: created_date = {date:?}");
3739        }
3740
3741        // Test DateTime constructor comparison - should find dates after 2025-01-01
3742        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3743        let result = engine.execute(
3744            table.clone(),
3745            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3746        );
3747        match result {
3748            Ok(view) => {
3749                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3750                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3751            }
3752            Err(e) => {
3753                panic!("DateTime comparison query failed: {e}");
3754            }
3755        }
3756
3757        println!("\n=== DateTime comparison test complete! ===");
3758    }
3759
3760    #[test]
3761    fn test_not_with_method_calls() {
3762        // Initialize tracing for debug output
3763        let _ = tracing_subscriber::fmt()
3764            .with_max_level(tracing::Level::DEBUG)
3765            .try_init();
3766
3767        let mut table = DataTable::new("test");
3768        table.add_column(DataColumn::new("id"));
3769        table.add_column(DataColumn::new("status"));
3770
3771        // Add test data
3772        table
3773            .add_row(DataRow::new(vec![
3774                DataValue::Integer(1),
3775                DataValue::String("Pending Review".to_string()),
3776            ]))
3777            .unwrap();
3778
3779        table
3780            .add_row(DataRow::new(vec![
3781                DataValue::Integer(2),
3782                DataValue::String("Complete".to_string()),
3783            ]))
3784            .unwrap();
3785
3786        table
3787            .add_row(DataRow::new(vec![
3788                DataValue::Integer(3),
3789                DataValue::String("Pending Approval".to_string()),
3790            ]))
3791            .unwrap();
3792
3793        let table = Arc::new(table);
3794        let engine = QueryEngine::with_case_insensitive(true);
3795
3796        println!("\n=== Testing NOT with Method Calls ===");
3797        println!("Table has {} rows", table.row_count());
3798        for i in 0..table.row_count() {
3799            let status = table.get_value(i, 1);
3800            println!("Row {i}: status = {status:?}");
3801        }
3802
3803        // Test NOT with Contains - should exclude rows containing "pend"
3804        println!("\n--- Test: NOT status.Contains('pend') ---");
3805        let result = engine.execute(
3806            table.clone(),
3807            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3808        );
3809        match result {
3810            Ok(view) => {
3811                println!(
3812                    "SUCCESS: Found {} rows NOT containing 'pend'",
3813                    view.row_count()
3814                );
3815                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3816            }
3817            Err(e) => {
3818                panic!("NOT Contains query failed: {e}");
3819            }
3820        }
3821
3822        // Test NOT with StartsWith
3823        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3824        let result = engine.execute(
3825            table.clone(),
3826            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3827        );
3828        match result {
3829            Ok(view) => {
3830                println!(
3831                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3832                    view.row_count()
3833                );
3834                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3835            }
3836            Err(e) => {
3837                panic!("NOT StartsWith query failed: {e}");
3838            }
3839        }
3840
3841        println!("\n=== NOT with method calls test complete! ===");
3842    }
3843
3844    #[test]
3845    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3846    fn test_complex_logical_expressions() {
3847        // Initialize tracing for debug output
3848        let _ = tracing_subscriber::fmt()
3849            .with_max_level(tracing::Level::DEBUG)
3850            .try_init();
3851
3852        let mut table = DataTable::new("test");
3853        table.add_column(DataColumn::new("id"));
3854        table.add_column(DataColumn::new("status"));
3855        table.add_column(DataColumn::new("priority"));
3856        table.add_column(DataColumn::new("assigned"));
3857
3858        // Add comprehensive test data
3859        table
3860            .add_row(DataRow::new(vec![
3861                DataValue::Integer(1),
3862                DataValue::String("Pending".to_string()),
3863                DataValue::String("High".to_string()),
3864                DataValue::String("John".to_string()),
3865            ]))
3866            .unwrap();
3867
3868        table
3869            .add_row(DataRow::new(vec![
3870                DataValue::Integer(2),
3871                DataValue::String("Complete".to_string()),
3872                DataValue::String("High".to_string()),
3873                DataValue::String("Jane".to_string()),
3874            ]))
3875            .unwrap();
3876
3877        table
3878            .add_row(DataRow::new(vec![
3879                DataValue::Integer(3),
3880                DataValue::String("Pending".to_string()),
3881                DataValue::String("Low".to_string()),
3882                DataValue::String("John".to_string()),
3883            ]))
3884            .unwrap();
3885
3886        table
3887            .add_row(DataRow::new(vec![
3888                DataValue::Integer(4),
3889                DataValue::String("In Progress".to_string()),
3890                DataValue::String("Medium".to_string()),
3891                DataValue::String("Jane".to_string()),
3892            ]))
3893            .unwrap();
3894
3895        let table = Arc::new(table);
3896        let engine = QueryEngine::new();
3897
3898        println!("\n=== Testing Complex Logical Expressions ===");
3899        println!("Table has {} rows", table.row_count());
3900        for i in 0..table.row_count() {
3901            let status = table.get_value(i, 1);
3902            let priority = table.get_value(i, 2);
3903            let assigned = table.get_value(i, 3);
3904            println!(
3905                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3906            );
3907        }
3908
3909        // Test complex AND/OR logic
3910        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3911        let result = engine.execute(
3912            table.clone(),
3913            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3914        );
3915        match result {
3916            Ok(view) => {
3917                println!(
3918                    "SUCCESS: Found {} rows with complex logic",
3919                    view.row_count()
3920                );
3921                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3922            }
3923            Err(e) => {
3924                panic!("Complex logic query failed: {e}");
3925            }
3926        }
3927
3928        // Test NOT with complex expressions
3929        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3930        let result = engine.execute(
3931            table.clone(),
3932            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3933        );
3934        match result {
3935            Ok(view) => {
3936                println!(
3937                    "SUCCESS: Found {} rows with NOT complex logic",
3938                    view.row_count()
3939                );
3940                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3941            }
3942            Err(e) => {
3943                panic!("NOT complex logic query failed: {e}");
3944            }
3945        }
3946
3947        println!("\n=== Complex logical expressions test complete! ===");
3948    }
3949
3950    #[test]
3951    fn test_mixed_data_types_and_edge_cases() {
3952        // Initialize tracing for debug output
3953        let _ = tracing_subscriber::fmt()
3954            .with_max_level(tracing::Level::DEBUG)
3955            .try_init();
3956
3957        let mut table = DataTable::new("test");
3958        table.add_column(DataColumn::new("id"));
3959        table.add_column(DataColumn::new("value"));
3960        table.add_column(DataColumn::new("nullable_field"));
3961
3962        // Add test data with mixed types and edge cases
3963        table
3964            .add_row(DataRow::new(vec![
3965                DataValue::Integer(1),
3966                DataValue::String("123.45".to_string()),
3967                DataValue::String("present".to_string()),
3968            ]))
3969            .unwrap();
3970
3971        table
3972            .add_row(DataRow::new(vec![
3973                DataValue::Integer(2),
3974                DataValue::Float(678.90),
3975                DataValue::Null,
3976            ]))
3977            .unwrap();
3978
3979        table
3980            .add_row(DataRow::new(vec![
3981                DataValue::Integer(3),
3982                DataValue::Boolean(true),
3983                DataValue::String("also present".to_string()),
3984            ]))
3985            .unwrap();
3986
3987        table
3988            .add_row(DataRow::new(vec![
3989                DataValue::Integer(4),
3990                DataValue::String("false".to_string()),
3991                DataValue::Null,
3992            ]))
3993            .unwrap();
3994
3995        let table = Arc::new(table);
3996        let engine = QueryEngine::new();
3997
3998        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3999        println!("Table has {} rows", table.row_count());
4000        for i in 0..table.row_count() {
4001            let value = table.get_value(i, 1);
4002            let nullable = table.get_value(i, 2);
4003            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
4004        }
4005
4006        // Test type coercion with boolean Contains
4007        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
4008        let result = engine.execute(
4009            table.clone(),
4010            "SELECT * FROM test WHERE value.Contains('true')",
4011        );
4012        match result {
4013            Ok(view) => {
4014                println!(
4015                    "SUCCESS: Found {} rows with boolean coercion",
4016                    view.row_count()
4017                );
4018                assert_eq!(view.row_count(), 1); // Should find the boolean true row
4019            }
4020            Err(e) => {
4021                panic!("Boolean coercion query failed: {e}");
4022            }
4023        }
4024
4025        // Test multiple IN values with mixed types
4026        println!("\n--- Test: id IN (1, 3) ---");
4027        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
4028        match result {
4029            Ok(view) => {
4030                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
4031                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
4032            }
4033            Err(e) => {
4034                panic!("Multiple IN values query failed: {e}");
4035            }
4036        }
4037
4038        println!("\n=== Mixed data types test complete! ===");
4039    }
4040
4041    /// Test that aggregate-only queries return exactly one row (regression test)
4042    #[test]
4043    fn test_aggregate_only_single_row() {
4044        let table = create_test_stock_data();
4045        let engine = QueryEngine::new();
4046
4047        // Test query with multiple aggregates - should return exactly 1 row
4048        let result = engine
4049            .execute(
4050                table.clone(),
4051                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4052            )
4053            .expect("Query should succeed");
4054
4055        assert_eq!(
4056            result.row_count(),
4057            1,
4058            "Aggregate-only query should return exactly 1 row"
4059        );
4060        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4061
4062        // Verify the actual values are correct
4063        let source = result.source();
4064        let row = source.get_row(0).expect("Should have first row");
4065
4066        // COUNT(*) should be 5 (total rows)
4067        assert_eq!(row.values[0], DataValue::Integer(5));
4068
4069        // MIN should be 99.5
4070        assert_eq!(row.values[1], DataValue::Float(99.5));
4071
4072        // MAX should be 105.0
4073        assert_eq!(row.values[2], DataValue::Float(105.0));
4074
4075        // AVG should be approximately 102.4
4076        if let DataValue::Float(avg) = &row.values[3] {
4077            assert!(
4078                (avg - 102.4).abs() < 0.01,
4079                "Average should be approximately 102.4, got {}",
4080                avg
4081            );
4082        } else {
4083            panic!("AVG should return a Float value");
4084        }
4085    }
4086
4087    /// Test single aggregate function returns single row
4088    #[test]
4089    fn test_single_aggregate_single_row() {
4090        let table = create_test_stock_data();
4091        let engine = QueryEngine::new();
4092
4093        let result = engine
4094            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4095            .expect("Query should succeed");
4096
4097        assert_eq!(
4098            result.row_count(),
4099            1,
4100            "Single aggregate query should return exactly 1 row"
4101        );
4102        assert_eq!(result.column_count(), 1, "Should have 1 column");
4103
4104        let source = result.source();
4105        let row = source.get_row(0).expect("Should have first row");
4106        assert_eq!(row.values[0], DataValue::Integer(5));
4107    }
4108
4109    /// Test aggregate with WHERE clause filtering
4110    #[test]
4111    fn test_aggregate_with_where_single_row() {
4112        let table = create_test_stock_data();
4113        let engine = QueryEngine::new();
4114
4115        // Filter to only high-value stocks (>= 103.0) and aggregate
4116        let result = engine
4117            .execute(
4118                table.clone(),
4119                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4120            )
4121            .expect("Query should succeed");
4122
4123        assert_eq!(
4124            result.row_count(),
4125            1,
4126            "Filtered aggregate query should return exactly 1 row"
4127        );
4128        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4129
4130        let source = result.source();
4131        let row = source.get_row(0).expect("Should have first row");
4132
4133        // Should find 2 rows (103.5 and 105.0)
4134        assert_eq!(row.values[0], DataValue::Integer(2));
4135        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
4136        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
4137    }
4138
4139    #[test]
4140    fn test_not_in_parsing() {
4141        use crate::sql::recursive_parser::Parser;
4142
4143        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4144        println!("\n=== Testing NOT IN parsing ===");
4145        println!("Parsing query: {query}");
4146
4147        let mut parser = Parser::new(query);
4148        match parser.parse() {
4149            Ok(statement) => {
4150                println!("Parsed statement: {statement:#?}");
4151                if let Some(where_clause) = statement.where_clause {
4152                    println!("WHERE conditions: {:#?}", where_clause.conditions);
4153                    if let Some(first_condition) = where_clause.conditions.first() {
4154                        println!("First condition expression: {:#?}", first_condition.expr);
4155                    }
4156                }
4157            }
4158            Err(e) => {
4159                panic!("Parse error: {e}");
4160            }
4161        }
4162    }
4163
4164    /// Create test stock data for aggregate testing
4165    fn create_test_stock_data() -> Arc<DataTable> {
4166        let mut table = DataTable::new("stock");
4167
4168        table.add_column(DataColumn::new("symbol"));
4169        table.add_column(DataColumn::new("close"));
4170        table.add_column(DataColumn::new("volume"));
4171
4172        // Add 5 rows of test data
4173        let test_data = vec![
4174            ("AAPL", 99.5, 1000),
4175            ("AAPL", 101.2, 1500),
4176            ("AAPL", 103.5, 2000),
4177            ("AAPL", 105.0, 1200),
4178            ("AAPL", 102.8, 1800),
4179        ];
4180
4181        for (symbol, close, volume) in test_data {
4182            table
4183                .add_row(DataRow::new(vec![
4184                    DataValue::String(symbol.to_string()),
4185                    DataValue::Float(close),
4186                    DataValue::Integer(volume),
4187                ]))
4188                .expect("Should add row successfully");
4189        }
4190
4191        Arc::new(table)
4192    }
4193}
4194
4195#[cfg(test)]
4196#[path = "query_engine_tests.rs"]
4197mod query_engine_tests;