sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Execution context for tracking table aliases and scope during query execution
33#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35    /// Map from alias to actual table/CTE name
36    /// Example: "t" -> "#tmp_trades", "a" -> "data"
37    alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41    /// Create a new empty execution context
42    pub fn new() -> Self {
43        Self {
44            alias_map: HashMap::new(),
45        }
46    }
47
48    /// Register a table alias
49    pub fn register_alias(&mut self, alias: String, table_name: String) {
50        debug!("Registering alias: {} -> {}", alias, table_name);
51        self.alias_map.insert(alias, table_name);
52    }
53
54    /// Resolve an alias to its actual table name
55    /// Returns the alias itself if not found in the map
56    pub fn resolve_alias(&self, name: &str) -> String {
57        self.alias_map
58            .get(name)
59            .cloned()
60            .unwrap_or_else(|| name.to_string())
61    }
62
63    /// Check if a name is a registered alias
64    pub fn is_alias(&self, name: &str) -> bool {
65        self.alias_map.contains_key(name)
66    }
67
68    /// Get a copy of all registered aliases
69    pub fn get_aliases(&self) -> HashMap<String, String> {
70        self.alias_map.clone()
71    }
72
73    /// Resolve a column reference to its index in the table, handling aliases
74    ///
75    /// This is the unified column resolution function that should be used by all
76    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
77    /// alias resolution behavior.
78    ///
79    /// Resolution strategy:
80    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
81    ///    a. Resolve the alias: t -> actual_table_name
82    ///    b. Try qualified lookup: "actual_table_name.amount"
83    ///    c. Fall back to unqualified: "amount"
84    /// 2. If column_ref has no prefix:
85    ///    a. Try simple column name lookup: "amount"
86    ///    b. Try as qualified name if it contains a dot: "table.column"
87    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88        if let Some(table_prefix) = &column_ref.table_prefix {
89            // Qualified column reference: resolve the alias first
90            let actual_table = self.resolve_alias(table_prefix);
91
92            // Try qualified lookup: "actual_table.column"
93            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95                debug!(
96                    "Resolved {}.{} -> qualified column '{}' at index {}",
97                    table_prefix, column_ref.name, qualified_name, idx
98                );
99                return Ok(idx);
100            }
101
102            // Fall back to unqualified lookup
103            if let Some(idx) = table.get_column_index(&column_ref.name) {
104                debug!(
105                    "Resolved {}.{} -> unqualified column '{}' at index {}",
106                    table_prefix, column_ref.name, column_ref.name, idx
107                );
108                return Ok(idx);
109            }
110
111            // Not found with either qualified or unqualified name
112            Err(anyhow!(
113                "Column '{}' not found. Table '{}' may not support qualified column names",
114                qualified_name,
115                actual_table
116            ))
117        } else {
118            // Unqualified column reference
119            if let Some(idx) = table.get_column_index(&column_ref.name) {
120                debug!(
121                    "Resolved unqualified column '{}' at index {}",
122                    column_ref.name, idx
123                );
124                return Ok(idx);
125            }
126
127            // If the column name contains a dot, try it as a qualified name
128            if column_ref.name.contains('.') {
129                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130                    debug!(
131                        "Resolved '{}' as qualified column at index {}",
132                        column_ref.name, idx
133                    );
134                    return Ok(idx);
135                }
136            }
137
138            // Column not found - provide helpful error
139            let suggestion = self.find_similar_column(table, &column_ref.name);
140            match suggestion {
141                Some(similar) => Err(anyhow!(
142                    "Column '{}' not found. Did you mean '{}'?",
143                    column_ref.name,
144                    similar
145                )),
146                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147            }
148        }
149    }
150
151    /// Find a similar column name using edit distance (for better error messages)
152    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153        let columns = table.column_names();
154        let mut best_match: Option<(String, usize)> = None;
155
156        for col in columns {
157            let distance = edit_distance(name, &col);
158            if distance <= 2 {
159                // Allow up to 2 character differences
160                match best_match {
161                    Some((_, best_dist)) if distance < best_dist => {
162                        best_match = Some((col.clone(), distance));
163                    }
164                    None => {
165                        best_match = Some((col.clone(), distance));
166                    }
167                    _ => {}
168                }
169            }
170        }
171
172        best_match.map(|(name, _)| name)
173    }
174}
175
176impl Default for ExecutionContext {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Calculate edit distance between two strings (Levenshtein distance)
183fn edit_distance(a: &str, b: &str) -> usize {
184    let len_a = a.chars().count();
185    let len_b = b.chars().count();
186
187    if len_a == 0 {
188        return len_b;
189    }
190    if len_b == 0 {
191        return len_a;
192    }
193
194    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196    for i in 0..=len_a {
197        matrix[i][0] = i;
198    }
199    for j in 0..=len_b {
200        matrix[0][j] = j;
201    }
202
203    let a_chars: Vec<char> = a.chars().collect();
204    let b_chars: Vec<char> = b.chars().collect();
205
206    for i in 1..=len_a {
207        for j in 1..=len_b {
208            let cost = if a_chars[i - 1] == b_chars[j - 1] {
209                0
210            } else {
211                1
212            };
213            matrix[i][j] = min(
214                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215                matrix[i - 1][j - 1] + cost,
216            );
217        }
218    }
219
220    matrix[len_a][len_b]
221}
222
223/// Query engine that executes SQL directly on `DataTable`
224#[derive(Clone)]
225pub struct QueryEngine {
226    case_insensitive: bool,
227    date_notation: String,
228    _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl QueryEngine {
238    #[must_use]
239    pub fn new() -> Self {
240        Self {
241            case_insensitive: false,
242            date_notation: get_date_notation(),
243            _behavior_config: None,
244        }
245    }
246
247    #[must_use]
248    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249        let case_insensitive = config.case_insensitive_default;
250        // Use get_date_notation() to respect environment variable override
251        let date_notation = get_date_notation();
252        Self {
253            case_insensitive,
254            date_notation,
255            _behavior_config: Some(config),
256        }
257    }
258
259    #[must_use]
260    pub fn with_date_notation(_date_notation: String) -> Self {
261        Self {
262            case_insensitive: false,
263            date_notation: get_date_notation(), // Always use the global function
264            _behavior_config: None,
265        }
266    }
267
268    #[must_use]
269    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270        Self {
271            case_insensitive,
272            date_notation: get_date_notation(),
273            _behavior_config: None,
274        }
275    }
276
277    #[must_use]
278    pub fn with_case_insensitive_and_date_notation(
279        case_insensitive: bool,
280        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
281    ) -> Self {
282        Self {
283            case_insensitive,
284            date_notation: get_date_notation(), // Always use the global function
285            _behavior_config: None,
286        }
287    }
288
289    /// Find a column name similar to the given name using edit distance
290    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291        let columns = table.column_names();
292        let mut best_match: Option<(String, usize)> = None;
293
294        for col in columns {
295            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296            // Only suggest if distance is small (likely a typo)
297            // Allow up to 3 edits for longer names
298            let max_distance = if name.len() > 10 { 3 } else { 2 };
299            if distance <= max_distance {
300                match &best_match {
301                    None => best_match = Some((col, distance)),
302                    Some((_, best_dist)) if distance < *best_dist => {
303                        best_match = Some((col, distance));
304                    }
305                    _ => {}
306                }
307            }
308        }
309
310        best_match.map(|(name, _)| name)
311    }
312
313    /// Calculate Levenshtein edit distance between two strings
314    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315        let len1 = s1.len();
316        let len2 = s2.len();
317        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319        for i in 0..=len1 {
320            matrix[i][0] = i;
321        }
322        for j in 0..=len2 {
323            matrix[0][j] = j;
324        }
325
326        for (i, c1) in s1.chars().enumerate() {
327            for (j, c2) in s2.chars().enumerate() {
328                let cost = usize::from(c1 != c2);
329                matrix[i + 1][j + 1] = std::cmp::min(
330                    matrix[i][j + 1] + 1, // deletion
331                    std::cmp::min(
332                        matrix[i + 1][j] + 1, // insertion
333                        matrix[i][j] + cost,  // substitution
334                    ),
335                );
336            }
337        }
338
339        matrix[len1][len2]
340    }
341
342    /// Check if an expression contains UNNEST function call
343    fn contains_unnest(expr: &SqlExpression) -> bool {
344        match expr {
345            // Direct UNNEST variant
346            SqlExpression::Unnest { .. } => true,
347            SqlExpression::FunctionCall { name, args, .. } => {
348                if name.to_uppercase() == "UNNEST" {
349                    return true;
350                }
351                // Check recursively in function arguments
352                args.iter().any(Self::contains_unnest)
353            }
354            SqlExpression::BinaryOp { left, right, .. } => {
355                Self::contains_unnest(left) || Self::contains_unnest(right)
356            }
357            SqlExpression::Not { expr } => Self::contains_unnest(expr),
358            SqlExpression::CaseExpression {
359                when_branches,
360                else_branch,
361            } => {
362                when_branches.iter().any(|branch| {
363                    Self::contains_unnest(&branch.condition)
364                        || Self::contains_unnest(&branch.result)
365                }) || else_branch
366                    .as_ref()
367                    .map_or(false, |e| Self::contains_unnest(e))
368            }
369            SqlExpression::SimpleCaseExpression {
370                expr,
371                when_branches,
372                else_branch,
373            } => {
374                Self::contains_unnest(expr)
375                    || when_branches.iter().any(|branch| {
376                        Self::contains_unnest(&branch.value)
377                            || Self::contains_unnest(&branch.result)
378                    })
379                    || else_branch
380                        .as_ref()
381                        .map_or(false, |e| Self::contains_unnest(e))
382            }
383            SqlExpression::InList { expr, values } => {
384                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385            }
386            SqlExpression::NotInList { expr, values } => {
387                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388            }
389            SqlExpression::Between { expr, lower, upper } => {
390                Self::contains_unnest(expr)
391                    || Self::contains_unnest(lower)
392                    || Self::contains_unnest(upper)
393            }
394            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
397            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399            SqlExpression::ChainedMethodCall { base, args, .. } => {
400                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401            }
402            _ => false,
403        }
404    }
405
406    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
407    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408        match expr {
409            SqlExpression::WindowFunction {
410                window_spec, args, ..
411            } => {
412                // Add this window spec
413                specs.push(window_spec.clone());
414                // Recursively check arguments
415                for arg in args {
416                    Self::collect_window_specs(arg, specs);
417                }
418            }
419            SqlExpression::BinaryOp { left, right, .. } => {
420                Self::collect_window_specs(left, specs);
421                Self::collect_window_specs(right, specs);
422            }
423            SqlExpression::Not { expr } => {
424                Self::collect_window_specs(expr, specs);
425            }
426            SqlExpression::FunctionCall { args, .. } => {
427                for arg in args {
428                    Self::collect_window_specs(arg, specs);
429                }
430            }
431            SqlExpression::CaseExpression {
432                when_branches,
433                else_branch,
434            } => {
435                for branch in when_branches {
436                    Self::collect_window_specs(&branch.condition, specs);
437                    Self::collect_window_specs(&branch.result, specs);
438                }
439                if let Some(else_expr) = else_branch {
440                    Self::collect_window_specs(else_expr, specs);
441                }
442            }
443            SqlExpression::SimpleCaseExpression {
444                expr,
445                when_branches,
446                else_branch,
447            } => {
448                Self::collect_window_specs(expr, specs);
449                for branch in when_branches {
450                    Self::collect_window_specs(&branch.value, specs);
451                    Self::collect_window_specs(&branch.result, specs);
452                }
453                if let Some(else_expr) = else_branch {
454                    Self::collect_window_specs(else_expr, specs);
455                }
456            }
457            SqlExpression::InList { expr, values, .. } => {
458                Self::collect_window_specs(expr, specs);
459                for item in values {
460                    Self::collect_window_specs(item, specs);
461                }
462            }
463            SqlExpression::ChainedMethodCall { base, args, .. } => {
464                Self::collect_window_specs(base, specs);
465                for arg in args {
466                    Self::collect_window_specs(arg, specs);
467                }
468            }
469            // Leaf nodes - no recursion needed
470            SqlExpression::Column(_)
471            | SqlExpression::NumberLiteral(_)
472            | SqlExpression::StringLiteral(_)
473            | SqlExpression::BooleanLiteral(_)
474            | SqlExpression::Null
475            | SqlExpression::DateTimeToday { .. }
476            | SqlExpression::DateTimeConstructor { .. }
477            | SqlExpression::MethodCall { .. } => {}
478            // Catch-all for any other variants
479            _ => {}
480        }
481    }
482
483    /// Check if an expression contains a window function
484    fn contains_window_function(expr: &SqlExpression) -> bool {
485        match expr {
486            SqlExpression::WindowFunction { .. } => true,
487            SqlExpression::BinaryOp { left, right, .. } => {
488                Self::contains_window_function(left) || Self::contains_window_function(right)
489            }
490            SqlExpression::Not { expr } => Self::contains_window_function(expr),
491            SqlExpression::FunctionCall { args, .. } => {
492                args.iter().any(Self::contains_window_function)
493            }
494            SqlExpression::CaseExpression {
495                when_branches,
496                else_branch,
497            } => {
498                when_branches.iter().any(|branch| {
499                    Self::contains_window_function(&branch.condition)
500                        || Self::contains_window_function(&branch.result)
501                }) || else_branch
502                    .as_ref()
503                    .map_or(false, |e| Self::contains_window_function(e))
504            }
505            SqlExpression::SimpleCaseExpression {
506                expr,
507                when_branches,
508                else_branch,
509            } => {
510                Self::contains_window_function(expr)
511                    || when_branches.iter().any(|branch| {
512                        Self::contains_window_function(&branch.value)
513                            || Self::contains_window_function(&branch.result)
514                    })
515                    || else_branch
516                        .as_ref()
517                        .map_or(false, |e| Self::contains_window_function(e))
518            }
519            SqlExpression::InList { expr, values } => {
520                Self::contains_window_function(expr)
521                    || values.iter().any(Self::contains_window_function)
522            }
523            SqlExpression::NotInList { expr, values } => {
524                Self::contains_window_function(expr)
525                    || values.iter().any(Self::contains_window_function)
526            }
527            SqlExpression::Between { expr, lower, upper } => {
528                Self::contains_window_function(expr)
529                    || Self::contains_window_function(lower)
530                    || Self::contains_window_function(upper)
531            }
532            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534            SqlExpression::MethodCall { args, .. } => {
535                args.iter().any(Self::contains_window_function)
536            }
537            SqlExpression::ChainedMethodCall { base, args, .. } => {
538                Self::contains_window_function(base)
539                    || args.iter().any(Self::contains_window_function)
540            }
541            _ => false,
542        }
543    }
544
545    /// Extract all window function specifications from select items
546    fn extract_window_specs(
547        items: &[SelectItem],
548    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549        let mut specs = Vec::new();
550        for (idx, item) in items.iter().enumerate() {
551            if let SelectItem::Expression { expr, .. } = item {
552                Self::collect_window_function_specs(expr, idx, &mut specs);
553            }
554        }
555        specs
556    }
557
558    /// Recursively collect window function specs from an expression
559    fn collect_window_function_specs(
560        expr: &SqlExpression,
561        output_column_index: usize,
562        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563    ) {
564        match expr {
565            SqlExpression::WindowFunction {
566                name,
567                args,
568                window_spec,
569            } => {
570                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571                    spec: window_spec.clone(),
572                    function_name: name.clone(),
573                    args: args.clone(),
574                    output_column_index,
575                });
576            }
577            SqlExpression::BinaryOp { left, right, .. } => {
578                Self::collect_window_function_specs(left, output_column_index, specs);
579                Self::collect_window_function_specs(right, output_column_index, specs);
580            }
581            SqlExpression::Not { expr } => {
582                Self::collect_window_function_specs(expr, output_column_index, specs);
583            }
584            SqlExpression::FunctionCall { args, .. } => {
585                for arg in args {
586                    Self::collect_window_function_specs(arg, output_column_index, specs);
587                }
588            }
589            SqlExpression::CaseExpression {
590                when_branches,
591                else_branch,
592            } => {
593                for branch in when_branches {
594                    Self::collect_window_function_specs(
595                        &branch.condition,
596                        output_column_index,
597                        specs,
598                    );
599                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600                }
601                if let Some(e) = else_branch {
602                    Self::collect_window_function_specs(e, output_column_index, specs);
603                }
604            }
605            SqlExpression::SimpleCaseExpression {
606                expr,
607                when_branches,
608                else_branch,
609            } => {
610                Self::collect_window_function_specs(expr, output_column_index, specs);
611                for branch in when_branches {
612                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614                }
615                if let Some(e) = else_branch {
616                    Self::collect_window_function_specs(e, output_column_index, specs);
617                }
618            }
619            SqlExpression::InList { expr, values } => {
620                Self::collect_window_function_specs(expr, output_column_index, specs);
621                for val in values {
622                    Self::collect_window_function_specs(val, output_column_index, specs);
623                }
624            }
625            SqlExpression::NotInList { expr, values } => {
626                Self::collect_window_function_specs(expr, output_column_index, specs);
627                for val in values {
628                    Self::collect_window_function_specs(val, output_column_index, specs);
629                }
630            }
631            SqlExpression::Between { expr, lower, upper } => {
632                Self::collect_window_function_specs(expr, output_column_index, specs);
633                Self::collect_window_function_specs(lower, output_column_index, specs);
634                Self::collect_window_function_specs(upper, output_column_index, specs);
635            }
636            SqlExpression::InSubquery { expr, .. } => {
637                Self::collect_window_function_specs(expr, output_column_index, specs);
638            }
639            SqlExpression::NotInSubquery { expr, .. } => {
640                Self::collect_window_function_specs(expr, output_column_index, specs);
641            }
642            SqlExpression::MethodCall { args, .. } => {
643                for arg in args {
644                    Self::collect_window_function_specs(arg, output_column_index, specs);
645                }
646            }
647            SqlExpression::ChainedMethodCall { base, args, .. } => {
648                Self::collect_window_function_specs(base, output_column_index, specs);
649                for arg in args {
650                    Self::collect_window_function_specs(arg, output_column_index, specs);
651                }
652            }
653            _ => {} // Other expression types don't contain window functions
654        }
655    }
656
657    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
658    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659        let (view, _plan) = self.execute_with_plan(table, sql)?;
660        Ok(view)
661    }
662
663    /// Execute a SQL query with optional temp table registry access
664    pub fn execute_with_temp_tables(
665        &self,
666        table: Arc<DataTable>,
667        sql: &str,
668        temp_tables: Option<&TempTableRegistry>,
669    ) -> Result<DataView> {
670        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671        Ok(view)
672    }
673
674    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
675    pub fn execute_statement(
676        &self,
677        table: Arc<DataTable>,
678        statement: SelectStatement,
679    ) -> Result<DataView> {
680        self.execute_statement_with_temp_tables(table, statement, None)
681    }
682
683    /// Execute a parsed SelectStatement with optional temp table access
684    pub fn execute_statement_with_temp_tables(
685        &self,
686        table: Arc<DataTable>,
687        statement: SelectStatement,
688        temp_tables: Option<&TempTableRegistry>,
689    ) -> Result<DataView> {
690        // First process CTEs to build context
691        let mut cte_context = HashMap::new();
692
693        // Add temp tables to CTE context if provided
694        if let Some(temp_registry) = temp_tables {
695            for table_name in temp_registry.list_tables() {
696                if let Some(temp_table) = temp_registry.get(&table_name) {
697                    debug!("Adding temp table {} to CTE context", table_name);
698                    let view = DataView::new(temp_table);
699                    cte_context.insert(table_name, Arc::new(view));
700                }
701            }
702        }
703
704        for cte in &statement.ctes {
705            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706            // Execute the CTE based on its type
707            let cte_result = match &cte.cte_type {
708                CTEType::Standard(query) => {
709                    // Execute the CTE query (it might reference earlier CTEs)
710                    let view = self.build_view_with_context(
711                        table.clone(),
712                        query.clone(),
713                        &mut cte_context,
714                    )?;
715
716                    // Materialize the view and enrich columns with qualified names
717                    let mut materialized = self.materialize_view(view)?;
718
719                    // Enrich columns with qualified names for proper scoping
720                    for column in materialized.columns_mut() {
721                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722                        column.source_table = Some(cte.name.clone());
723                    }
724
725                    DataView::new(Arc::new(materialized))
726                }
727                CTEType::Web(web_spec) => {
728                    // Fetch data from URL
729                    use crate::web::http_fetcher::WebDataFetcher;
730
731                    let fetcher = WebDataFetcher::new()?;
732                    // Pass None for query context (no full SQL available in these contexts)
733                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735                    // Enrich columns with qualified names for proper scoping
736                    for column in data_table.columns_mut() {
737                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738                        column.source_table = Some(cte.name.clone());
739                    }
740
741                    // Convert DataTable to DataView
742                    DataView::new(Arc::new(data_table))
743                }
744            };
745            // Store the result in the context for later use
746            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
747            debug!(
748                "QueryEngine: CTE '{}' pre-processed, stored in context",
749                cte.name
750            );
751        }
752
753        // Now process subqueries with CTE context available
754        let mut subquery_executor =
755            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
756        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
757
758        // Build the view with the same CTE context
759        self.build_view_with_context(table, processed_statement, &mut cte_context)
760    }
761
762    /// Execute a statement with provided CTE context (for subqueries)
763    pub fn execute_statement_with_cte_context(
764        &self,
765        table: Arc<DataTable>,
766        statement: SelectStatement,
767        cte_context: &HashMap<String, Arc<DataView>>,
768    ) -> Result<DataView> {
769        // Clone the context so we can add any CTEs from this statement
770        let mut local_context = cte_context.clone();
771
772        // Process any CTEs in this statement (they might be nested)
773        for cte in &statement.ctes {
774            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
775            let cte_result = match &cte.cte_type {
776                CTEType::Standard(query) => {
777                    let view = self.build_view_with_context(
778                        table.clone(),
779                        query.clone(),
780                        &mut local_context,
781                    )?;
782
783                    // Materialize the view and enrich columns with qualified names
784                    let mut materialized = self.materialize_view(view)?;
785
786                    // Enrich columns with qualified names for proper scoping
787                    for column in materialized.columns_mut() {
788                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
789                        column.source_table = Some(cte.name.clone());
790                    }
791
792                    DataView::new(Arc::new(materialized))
793                }
794                CTEType::Web(web_spec) => {
795                    // Fetch data from URL
796                    use crate::web::http_fetcher::WebDataFetcher;
797
798                    let fetcher = WebDataFetcher::new()?;
799                    // Pass None for query context (no full SQL available in these contexts)
800                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
801
802                    // Enrich columns with qualified names for proper scoping
803                    for column in data_table.columns_mut() {
804                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
805                        column.source_table = Some(cte.name.clone());
806                    }
807
808                    // Convert DataTable to DataView
809                    DataView::new(Arc::new(data_table))
810                }
811            };
812            local_context.insert(cte.name.clone(), Arc::new(cte_result));
813        }
814
815        // Process subqueries with the complete context
816        let mut subquery_executor =
817            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
818        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
819
820        // Build the view
821        self.build_view_with_context(table, processed_statement, &mut local_context)
822    }
823
824    /// Execute a query and return both the result and the execution plan
825    pub fn execute_with_plan(
826        &self,
827        table: Arc<DataTable>,
828        sql: &str,
829    ) -> Result<(DataView, ExecutionPlan)> {
830        self.execute_with_plan_and_temp_tables(table, sql, None)
831    }
832
833    /// Execute a query with temp tables and return both the result and the execution plan
834    pub fn execute_with_plan_and_temp_tables(
835        &self,
836        table: Arc<DataTable>,
837        sql: &str,
838        temp_tables: Option<&TempTableRegistry>,
839    ) -> Result<(DataView, ExecutionPlan)> {
840        let mut plan_builder = ExecutionPlanBuilder::new();
841        let start_time = Instant::now();
842
843        // Parse the SQL query
844        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
845        plan_builder.add_detail(format!("Query: {}", sql));
846        let mut parser = Parser::new(sql);
847        let statement = parser
848            .parse()
849            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
850        plan_builder.add_detail(format!("Parsed successfully"));
851        if let Some(from) = &statement.from_table {
852            plan_builder.add_detail(format!("FROM: {}", from));
853        }
854        if statement.where_clause.is_some() {
855            plan_builder.add_detail("WHERE clause present".to_string());
856        }
857        plan_builder.end_step();
858
859        // First process CTEs to build context
860        let mut cte_context = HashMap::new();
861
862        // Add temp tables to CTE context if provided
863        if let Some(temp_registry) = temp_tables {
864            for table_name in temp_registry.list_tables() {
865                if let Some(temp_table) = temp_registry.get(&table_name) {
866                    debug!("Adding temp table {} to CTE context", table_name);
867                    let view = DataView::new(temp_table);
868                    cte_context.insert(table_name, Arc::new(view));
869                }
870            }
871        }
872
873        if !statement.ctes.is_empty() {
874            plan_builder.begin_step(
875                StepType::CTE,
876                format!("Process {} CTEs", statement.ctes.len()),
877            );
878
879            for cte in &statement.ctes {
880                let cte_start = Instant::now();
881                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
882
883                let cte_result = match &cte.cte_type {
884                    CTEType::Standard(query) => {
885                        // Add CTE query details
886                        if let Some(from) = &query.from_table {
887                            plan_builder.add_detail(format!("Source: {}", from));
888                        }
889                        if query.where_clause.is_some() {
890                            plan_builder.add_detail("Has WHERE clause".to_string());
891                        }
892                        if query.group_by.is_some() {
893                            plan_builder.add_detail("Has GROUP BY".to_string());
894                        }
895
896                        debug!(
897                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
898                            cte.name,
899                            cte_context.keys().collect::<Vec<_>>()
900                        );
901
902                        // Process subqueries in the CTE's query FIRST
903                        // This allows the subqueries to see all previously defined CTEs
904                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
905                            self.clone(),
906                            table.clone(),
907                            cte_context.clone(),
908                        );
909                        let processed_query = subquery_executor.execute_subqueries(query)?;
910
911                        let view = self.build_view_with_context(
912                            table.clone(),
913                            processed_query,
914                            &mut cte_context,
915                        )?;
916
917                        // Materialize the view and enrich columns with qualified names
918                        let mut materialized = self.materialize_view(view)?;
919
920                        // Enrich columns with qualified names for proper scoping
921                        for column in materialized.columns_mut() {
922                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
923                            column.source_table = Some(cte.name.clone());
924                        }
925
926                        DataView::new(Arc::new(materialized))
927                    }
928                    CTEType::Web(web_spec) => {
929                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
930                        if let Some(format) = &web_spec.format {
931                            plan_builder.add_detail(format!("Format: {:?}", format));
932                        }
933                        if let Some(cache) = web_spec.cache_seconds {
934                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
935                        }
936
937                        // Fetch data from URL
938                        use crate::web::http_fetcher::WebDataFetcher;
939
940                        let fetcher = WebDataFetcher::new()?;
941                        // Pass None for query context - each WEB CTE is independent
942                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
943
944                        // Enrich columns with qualified names for proper scoping
945                        for column in data_table.columns_mut() {
946                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
947                            column.source_table = Some(cte.name.clone());
948                        }
949
950                        // Convert DataTable to DataView
951                        DataView::new(Arc::new(data_table))
952                    }
953                };
954
955                // Record CTE statistics
956                plan_builder.set_rows_out(cte_result.row_count());
957                plan_builder.add_detail(format!(
958                    "Result: {} rows, {} columns",
959                    cte_result.row_count(),
960                    cte_result.column_count()
961                ));
962                plan_builder.add_detail(format!(
963                    "Execution time: {:.3}ms",
964                    cte_start.elapsed().as_secs_f64() * 1000.0
965                ));
966
967                debug!(
968                    "QueryEngine: Storing CTE '{}' in context with {} rows",
969                    cte.name,
970                    cte_result.row_count()
971                );
972                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
973                plan_builder.end_step();
974            }
975
976            plan_builder.add_detail(format!(
977                "All {} CTEs cached in context",
978                statement.ctes.len()
979            ));
980            plan_builder.end_step();
981        }
982
983        // Process subqueries in the statement with CTE context
984        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
985        let mut subquery_executor =
986            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
987
988        // Check if there are subqueries to process
989        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
990            // This is a simplified check - in reality we'd need to walk the AST
991            format!("{:?}", w).contains("Subquery")
992        });
993
994        if has_subqueries {
995            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
996        }
997
998        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
999
1000        if has_subqueries {
1001            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1002        } else {
1003            plan_builder.add_detail("No subqueries to process".to_string());
1004        }
1005
1006        plan_builder.end_step();
1007        let result = self.build_view_with_context_and_plan(
1008            table,
1009            processed_statement,
1010            &mut cte_context,
1011            &mut plan_builder,
1012        )?;
1013
1014        let total_duration = start_time.elapsed();
1015        info!(
1016            "Query execution complete: total={:?}, rows={}",
1017            total_duration,
1018            result.row_count()
1019        );
1020
1021        let plan = plan_builder.build();
1022        Ok((result, plan))
1023    }
1024
1025    /// Build a `DataView` from a parsed SQL statement
1026    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1027        let mut cte_context = HashMap::new();
1028        self.build_view_with_context(table, statement, &mut cte_context)
1029    }
1030
1031    /// Build a DataView from a SelectStatement with CTE context
1032    fn build_view_with_context(
1033        &self,
1034        table: Arc<DataTable>,
1035        statement: SelectStatement,
1036        cte_context: &mut HashMap<String, Arc<DataView>>,
1037    ) -> Result<DataView> {
1038        let mut dummy_plan = ExecutionPlanBuilder::new();
1039        let mut exec_context = ExecutionContext::new();
1040        self.build_view_with_context_and_plan_and_exec(
1041            table,
1042            statement,
1043            cte_context,
1044            &mut dummy_plan,
1045            &mut exec_context,
1046        )
1047    }
1048
1049    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1050    fn build_view_with_context_and_plan(
1051        &self,
1052        table: Arc<DataTable>,
1053        statement: SelectStatement,
1054        cte_context: &mut HashMap<String, Arc<DataView>>,
1055        plan: &mut ExecutionPlanBuilder,
1056    ) -> Result<DataView> {
1057        let mut exec_context = ExecutionContext::new();
1058        self.build_view_with_context_and_plan_and_exec(
1059            table,
1060            statement,
1061            cte_context,
1062            plan,
1063            &mut exec_context,
1064        )
1065    }
1066
1067    /// Build a DataView with CTE context, execution plan, and alias resolution context
1068    fn build_view_with_context_and_plan_and_exec(
1069        &self,
1070        table: Arc<DataTable>,
1071        statement: SelectStatement,
1072        cte_context: &mut HashMap<String, Arc<DataView>>,
1073        plan: &mut ExecutionPlanBuilder,
1074        exec_context: &mut ExecutionContext,
1075    ) -> Result<DataView> {
1076        // First, process any CTEs that aren't already in the context
1077        for cte in &statement.ctes {
1078            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1079            if cte_context.contains_key(&cte.name) {
1080                debug!(
1081                    "QueryEngine: CTE '{}' already in context, skipping",
1082                    cte.name
1083                );
1084                continue;
1085            }
1086
1087            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1088            debug!(
1089                "QueryEngine: Available CTEs for '{}': {:?}",
1090                cte.name,
1091                cte_context.keys().collect::<Vec<_>>()
1092            );
1093
1094            // Execute the CTE query (it might reference earlier CTEs)
1095            let cte_result = match &cte.cte_type {
1096                CTEType::Standard(query) => {
1097                    let view =
1098                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1099
1100                    // Materialize the view and enrich columns with qualified names
1101                    let mut materialized = self.materialize_view(view)?;
1102
1103                    // Enrich columns with qualified names for proper scoping
1104                    for column in materialized.columns_mut() {
1105                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1106                        column.source_table = Some(cte.name.clone());
1107                    }
1108
1109                    DataView::new(Arc::new(materialized))
1110                }
1111                CTEType::Web(_web_spec) => {
1112                    // Web CTEs should have been processed earlier in execute_select
1113                    return Err(anyhow!(
1114                        "Web CTEs should be processed in execute_select method"
1115                    ));
1116                }
1117            };
1118
1119            // Store the result in the context for later use
1120            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1121            debug!(
1122                "QueryEngine: CTE '{}' processed, stored in context",
1123                cte.name
1124            );
1125        }
1126
1127        // Determine the source table for the main query
1128        let source_table = if let Some(ref table_func) = statement.from_function {
1129            // Handle table functions like RANGE()
1130            debug!("QueryEngine: Processing table function...");
1131            match table_func {
1132                TableFunction::Generator { name, args } => {
1133                    // Use the generator registry to create the table
1134                    use crate::sql::generators::GeneratorRegistry;
1135
1136                    // Create generator registry (could be cached in QueryEngine)
1137                    let registry = GeneratorRegistry::new();
1138
1139                    if let Some(generator) = registry.get(name) {
1140                        // Evaluate arguments
1141                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
1142                            &table,
1143                            self.date_notation.clone(),
1144                        );
1145                        let dummy_row = 0;
1146
1147                        let mut evaluated_args = Vec::new();
1148                        for arg in args {
1149                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1150                        }
1151
1152                        // Generate the table
1153                        generator.generate(evaluated_args)?
1154                    } else {
1155                        return Err(anyhow!("Unknown generator function: {}", name));
1156                    }
1157                }
1158            }
1159        } else if let Some(ref subquery) = statement.from_subquery {
1160            // Execute the subquery and use its result as the source
1161            debug!("QueryEngine: Processing FROM subquery...");
1162            let subquery_result =
1163                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
1164
1165            // Convert the DataView to a DataTable for use as source
1166            // This materializes the subquery result
1167            let materialized = self.materialize_view(subquery_result)?;
1168            Arc::new(materialized)
1169        } else if let Some(ref table_name) = statement.from_table {
1170            // Check if this references a CTE
1171            if let Some(cte_view) = cte_context.get(table_name) {
1172                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1173                // Materialize the CTE view as a table
1174                let mut materialized = self.materialize_view((**cte_view).clone())?;
1175
1176                // Apply alias to qualified column names if present
1177                if let Some(ref alias) = statement.from_alias {
1178                    debug!(
1179                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1180                        alias, table_name
1181                    );
1182                    for column in materialized.columns_mut() {
1183                        // Replace the CTE name with the alias in qualified names
1184                        if let Some(ref qualified_name) = column.qualified_name {
1185                            if qualified_name.starts_with(&format!("{}.", table_name)) {
1186                                column.qualified_name =
1187                                    Some(qualified_name.replace(
1188                                        &format!("{}.", table_name),
1189                                        &format!("{}.", alias),
1190                                    ));
1191                            }
1192                        }
1193                        // Update source table to reflect the alias
1194                        if column.source_table.as_ref() == Some(table_name) {
1195                            column.source_table = Some(alias.clone());
1196                        }
1197                    }
1198                }
1199
1200                Arc::new(materialized)
1201            } else {
1202                // Regular table reference - use the provided table
1203                table.clone()
1204            }
1205        } else {
1206            // No FROM clause - use the provided table
1207            table.clone()
1208        };
1209
1210        // Register alias in execution context if present
1211        if let Some(ref alias) = statement.from_alias {
1212            if let Some(ref table_name) = statement.from_table {
1213                exec_context.register_alias(alias.clone(), table_name.clone());
1214            }
1215        }
1216
1217        // Process JOINs if present
1218        let final_table = if !statement.joins.is_empty() {
1219            plan.begin_step(
1220                StepType::Join,
1221                format!("Process {} JOINs", statement.joins.len()),
1222            );
1223            plan.set_rows_in(source_table.row_count());
1224
1225            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1226            let mut current_table = source_table;
1227
1228            for (idx, join_clause) in statement.joins.iter().enumerate() {
1229                let join_start = Instant::now();
1230                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1231                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1232                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1233                plan.add_detail(format!(
1234                    "Executing {:?} JOIN on {} condition(s)",
1235                    join_clause.join_type,
1236                    join_clause.condition.conditions.len()
1237                ));
1238
1239                // Resolve the right table for the join
1240                let right_table = match &join_clause.table {
1241                    TableSource::Table(name) => {
1242                        // Check if it's a CTE reference
1243                        if let Some(cte_view) = cte_context.get(name) {
1244                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1245
1246                            // Apply alias to qualified column names if present
1247                            if let Some(ref alias) = join_clause.alias {
1248                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1249                                for column in materialized.columns_mut() {
1250                                    // Replace the CTE name with the alias in qualified names
1251                                    if let Some(ref qualified_name) = column.qualified_name {
1252                                        if qualified_name.starts_with(&format!("{}.", name)) {
1253                                            column.qualified_name = Some(qualified_name.replace(
1254                                                &format!("{}.", name),
1255                                                &format!("{}.", alias),
1256                                            ));
1257                                        }
1258                                    }
1259                                    // Update source table to reflect the alias
1260                                    if column.source_table.as_ref() == Some(name) {
1261                                        column.source_table = Some(alias.clone());
1262                                    }
1263                                }
1264                            }
1265
1266                            Arc::new(materialized)
1267                        } else {
1268                            // For now, we need the actual table data
1269                            // In a real implementation, this would load from file
1270                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1271                        }
1272                    }
1273                    TableSource::DerivedTable { query, alias: _ } => {
1274                        // Execute the subquery
1275                        let subquery_result = self.build_view_with_context(
1276                            table.clone(),
1277                            *query.clone(),
1278                            cte_context,
1279                        )?;
1280                        let materialized = self.materialize_view(subquery_result)?;
1281                        Arc::new(materialized)
1282                    }
1283                };
1284
1285                // Execute the join
1286                let joined = join_executor.execute_join(
1287                    current_table.clone(),
1288                    join_clause,
1289                    right_table.clone(),
1290                )?;
1291
1292                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1293                plan.set_rows_out(joined.row_count());
1294                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1295                plan.add_detail(format!(
1296                    "Join time: {:.3}ms",
1297                    join_start.elapsed().as_secs_f64() * 1000.0
1298                ));
1299                plan.end_step();
1300
1301                current_table = Arc::new(joined);
1302            }
1303
1304            plan.set_rows_out(current_table.row_count());
1305            plan.add_detail(format!(
1306                "Final result after all joins: {} rows",
1307                current_table.row_count()
1308            ));
1309            plan.end_step();
1310            current_table
1311        } else {
1312            source_table
1313        };
1314
1315        // Continue with the existing build_view logic but using final_table
1316        self.build_view_internal_with_plan_and_exec(
1317            final_table,
1318            statement,
1319            plan,
1320            Some(exec_context),
1321        )
1322    }
1323
1324    /// Materialize a DataView into a new DataTable
1325    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1326        let source = view.source();
1327        let mut result_table = DataTable::new("derived");
1328
1329        // Get the visible columns from the view
1330        let visible_cols = view.visible_column_indices().to_vec();
1331
1332        // Copy column definitions
1333        for col_idx in &visible_cols {
1334            let col = &source.columns[*col_idx];
1335            let new_col = DataColumn {
1336                name: col.name.clone(),
1337                data_type: col.data_type.clone(),
1338                nullable: col.nullable,
1339                unique_values: col.unique_values,
1340                null_count: col.null_count,
1341                metadata: col.metadata.clone(),
1342                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1343                source_table: col.source_table.clone(),     // Preserve source table
1344            };
1345            result_table.add_column(new_col);
1346        }
1347
1348        // Copy visible rows
1349        for row_idx in view.visible_row_indices() {
1350            let source_row = &source.rows[*row_idx];
1351            let mut new_row = DataRow { values: Vec::new() };
1352
1353            for col_idx in &visible_cols {
1354                new_row.values.push(source_row.values[*col_idx].clone());
1355            }
1356
1357            result_table.add_row(new_row);
1358        }
1359
1360        Ok(result_table)
1361    }
1362
1363    fn build_view_internal(
1364        &self,
1365        table: Arc<DataTable>,
1366        statement: SelectStatement,
1367    ) -> Result<DataView> {
1368        let mut dummy_plan = ExecutionPlanBuilder::new();
1369        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1370    }
1371
1372    fn build_view_internal_with_plan(
1373        &self,
1374        table: Arc<DataTable>,
1375        statement: SelectStatement,
1376        plan: &mut ExecutionPlanBuilder,
1377    ) -> Result<DataView> {
1378        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1379    }
1380
1381    fn build_view_internal_with_plan_and_exec(
1382        &self,
1383        table: Arc<DataTable>,
1384        statement: SelectStatement,
1385        plan: &mut ExecutionPlanBuilder,
1386        exec_context: Option<&ExecutionContext>,
1387    ) -> Result<DataView> {
1388        debug!(
1389            "QueryEngine::build_view - select_items: {:?}",
1390            statement.select_items
1391        );
1392        debug!(
1393            "QueryEngine::build_view - where_clause: {:?}",
1394            statement.where_clause
1395        );
1396
1397        // Start with all rows visible
1398        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1399
1400        // Apply WHERE clause filtering using recursive evaluator
1401        if let Some(where_clause) = &statement.where_clause {
1402            let total_rows = table.row_count();
1403            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1404            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1405
1406            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1407            plan.set_rows_in(total_rows);
1408            plan.add_detail(format!("Input: {} rows", total_rows));
1409
1410            // Add details about WHERE conditions
1411            for condition in &where_clause.conditions {
1412                plan.add_detail(format!("Condition: {:?}", condition.expr));
1413            }
1414
1415            let filter_start = Instant::now();
1416            // Create an evaluation context for caching compiled regexes
1417            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1418
1419            // Create evaluator ONCE before the loop for performance
1420            let mut evaluator = if let Some(exec_ctx) = exec_context {
1421                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1422                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1423            } else {
1424                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1425            };
1426
1427            // Filter visible rows based on WHERE clause
1428            let mut filtered_rows = Vec::new();
1429            for row_idx in visible_rows {
1430                // Only log for first few rows to avoid performance impact
1431                if row_idx < 3 {
1432                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1433                }
1434
1435                match evaluator.evaluate(where_clause, row_idx) {
1436                    Ok(result) => {
1437                        if row_idx < 3 {
1438                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1439                        }
1440                        if result {
1441                            filtered_rows.push(row_idx);
1442                        }
1443                    }
1444                    Err(e) => {
1445                        if row_idx < 3 {
1446                            debug!(
1447                                "QueryEngine: WHERE evaluation error for row {}: {}",
1448                                row_idx, e
1449                            );
1450                        }
1451                        // Propagate WHERE clause errors instead of silently ignoring them
1452                        return Err(e);
1453                    }
1454                }
1455            }
1456
1457            // Log regex cache statistics
1458            let (compilations, cache_hits) = eval_context.get_stats();
1459            if compilations > 0 || cache_hits > 0 {
1460                debug!(
1461                    "LIKE pattern cache: {} compilations, {} cache hits",
1462                    compilations, cache_hits
1463                );
1464            }
1465            visible_rows = filtered_rows;
1466            let filter_duration = filter_start.elapsed();
1467            info!(
1468                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1469                total_rows,
1470                visible_rows.len(),
1471                filter_duration
1472            );
1473
1474            plan.set_rows_out(visible_rows.len());
1475            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1476            plan.add_detail(format!(
1477                "Filter time: {:.3}ms",
1478                filter_duration.as_secs_f64() * 1000.0
1479            ));
1480            plan.end_step();
1481        }
1482
1483        // Create initial DataView with filtered rows
1484        let mut view = DataView::new(table.clone());
1485        view = view.with_rows(visible_rows);
1486
1487        // Handle GROUP BY if present
1488        if let Some(group_by_exprs) = &statement.group_by {
1489            if !group_by_exprs.is_empty() {
1490                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1491
1492                plan.begin_step(
1493                    StepType::GroupBy,
1494                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1495                );
1496                plan.set_rows_in(view.row_count());
1497                plan.add_detail(format!("Input: {} rows", view.row_count()));
1498                for expr in group_by_exprs {
1499                    plan.add_detail(format!("Group by: {:?}", expr));
1500                }
1501
1502                let group_start = Instant::now();
1503                view = self.apply_group_by(
1504                    view,
1505                    group_by_exprs,
1506                    &statement.select_items,
1507                    statement.having.as_ref(),
1508                    plan,
1509                )?;
1510
1511                plan.set_rows_out(view.row_count());
1512                plan.add_detail(format!("Output: {} groups", view.row_count()));
1513                plan.add_detail(format!(
1514                    "Overall time: {:.3}ms",
1515                    group_start.elapsed().as_secs_f64() * 1000.0
1516                ));
1517                plan.end_step();
1518            }
1519        } else {
1520            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1521            if !statement.select_items.is_empty() {
1522                // Check if we have ANY non-star items (not just the first one)
1523                let has_non_star_items = statement
1524                    .select_items
1525                    .iter()
1526                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1527
1528                // Apply select items if:
1529                // 1. We have computed expressions or explicit columns
1530                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1531                if has_non_star_items || statement.select_items.len() > 1 {
1532                    view = self.apply_select_items(
1533                        view,
1534                        &statement.select_items,
1535                        &statement,
1536                        exec_context,
1537                        plan,
1538                    )?;
1539                }
1540                // If it's just a single star, no projection needed
1541            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1542                debug!("QueryEngine: Using legacy columns path");
1543                // Fallback to legacy column projection for backward compatibility
1544                // Use the current view's source table, not the original table
1545                let source_table = view.source();
1546                let column_indices =
1547                    self.resolve_column_indices(source_table, &statement.columns)?;
1548                view = view.with_columns(column_indices);
1549            }
1550        }
1551
1552        // Apply DISTINCT if specified
1553        if statement.distinct {
1554            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1555            plan.set_rows_in(view.row_count());
1556            plan.add_detail(format!("Input: {} rows", view.row_count()));
1557
1558            let distinct_start = Instant::now();
1559            view = self.apply_distinct(view)?;
1560
1561            plan.set_rows_out(view.row_count());
1562            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1563            plan.add_detail(format!(
1564                "Distinct time: {:.3}ms",
1565                distinct_start.elapsed().as_secs_f64() * 1000.0
1566            ));
1567            plan.end_step();
1568        }
1569
1570        // Apply ORDER BY sorting
1571        if let Some(order_by_columns) = &statement.order_by {
1572            if !order_by_columns.is_empty() {
1573                plan.begin_step(
1574                    StepType::Sort,
1575                    format!("ORDER BY {} columns", order_by_columns.len()),
1576                );
1577                plan.set_rows_in(view.row_count());
1578                for col in order_by_columns {
1579                    // Format the expression (simplified for now - just show column name or "expr")
1580                    let expr_str = match &col.expr {
1581                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1582                        _ => "expr".to_string(),
1583                    };
1584                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1585                }
1586
1587                let sort_start = Instant::now();
1588                view =
1589                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1590
1591                plan.add_detail(format!(
1592                    "Sort time: {:.3}ms",
1593                    sort_start.elapsed().as_secs_f64() * 1000.0
1594                ));
1595                plan.end_step();
1596            }
1597        }
1598
1599        // Apply LIMIT/OFFSET
1600        if let Some(limit) = statement.limit {
1601            let offset = statement.offset.unwrap_or(0);
1602            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1603            plan.set_rows_in(view.row_count());
1604            if offset > 0 {
1605                plan.add_detail(format!("OFFSET: {}", offset));
1606            }
1607            view = view.with_limit(limit, offset);
1608            plan.set_rows_out(view.row_count());
1609            plan.add_detail(format!("Output: {} rows", view.row_count()));
1610            plan.end_step();
1611        }
1612
1613        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1614        if !statement.set_operations.is_empty() {
1615            plan.begin_step(
1616                StepType::SetOperation,
1617                format!("Process {} set operations", statement.set_operations.len()),
1618            );
1619            plan.set_rows_in(view.row_count());
1620
1621            // Materialize the first result set
1622            let mut combined_table = self.materialize_view(view)?;
1623            let first_columns = combined_table.column_names();
1624            let first_column_count = first_columns.len();
1625
1626            // Track if any operation requires deduplication
1627            let mut needs_deduplication = false;
1628
1629            // Process each set operation
1630            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1631                let op_start = Instant::now();
1632                plan.begin_step(
1633                    StepType::SetOperation,
1634                    format!("{:?} operation #{}", operation, idx + 1),
1635                );
1636
1637                // Execute the next SELECT statement
1638                // We need to pass the original table and exec_context for proper resolution
1639                let next_view = if let Some(exec_ctx) = exec_context {
1640                    self.build_view_internal_with_plan_and_exec(
1641                        table.clone(),
1642                        *next_statement.clone(),
1643                        plan,
1644                        Some(exec_ctx),
1645                    )?
1646                } else {
1647                    self.build_view_internal_with_plan(
1648                        table.clone(),
1649                        *next_statement.clone(),
1650                        plan,
1651                    )?
1652                };
1653
1654                // Materialize the next result set
1655                let next_table = self.materialize_view(next_view)?;
1656                let next_columns = next_table.column_names();
1657                let next_column_count = next_columns.len();
1658
1659                // Validate schema compatibility
1660                if first_column_count != next_column_count {
1661                    return Err(anyhow!(
1662                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1663                        first_column_count,
1664                        idx + 2,
1665                        next_column_count
1666                    ));
1667                }
1668
1669                // Warn if column names don't match (but allow it - some SQL dialects do)
1670                for (col_idx, (first_col, next_col)) in
1671                    first_columns.iter().zip(next_columns.iter()).enumerate()
1672                {
1673                    if !first_col.eq_ignore_ascii_case(next_col) {
1674                        debug!(
1675                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1676                            col_idx + 1,
1677                            first_col,
1678                            next_col
1679                        );
1680                    }
1681                }
1682
1683                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1684                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1685
1686                // Perform the set operation
1687                match operation {
1688                    SetOperation::UnionAll => {
1689                        // UNION ALL: Simply concatenate all rows without deduplication
1690                        for row in next_table.rows.iter() {
1691                            combined_table.add_row(row.clone());
1692                        }
1693                        plan.add_detail(format!(
1694                            "Result: {} rows (no deduplication)",
1695                            combined_table.row_count()
1696                        ));
1697                    }
1698                    SetOperation::Union => {
1699                        // UNION: Concatenate all rows first, deduplicate at the end
1700                        for row in next_table.rows.iter() {
1701                            combined_table.add_row(row.clone());
1702                        }
1703                        needs_deduplication = true;
1704                        plan.add_detail(format!(
1705                            "Combined: {} rows (deduplication pending)",
1706                            combined_table.row_count()
1707                        ));
1708                    }
1709                    SetOperation::Intersect => {
1710                        // INTERSECT: Keep only rows that appear in both
1711                        // TODO: Implement intersection logic
1712                        return Err(anyhow!("INTERSECT is not yet implemented"));
1713                    }
1714                    SetOperation::Except => {
1715                        // EXCEPT: Keep only rows from left that don't appear in right
1716                        // TODO: Implement except logic
1717                        return Err(anyhow!("EXCEPT is not yet implemented"));
1718                    }
1719                }
1720
1721                plan.add_detail(format!(
1722                    "Operation time: {:.3}ms",
1723                    op_start.elapsed().as_secs_f64() * 1000.0
1724                ));
1725                plan.set_rows_out(combined_table.row_count());
1726                plan.end_step();
1727            }
1728
1729            plan.set_rows_out(combined_table.row_count());
1730            plan.add_detail(format!(
1731                "Combined result: {} rows after {} operations",
1732                combined_table.row_count(),
1733                statement.set_operations.len()
1734            ));
1735            plan.end_step();
1736
1737            // Create a new view from the combined table
1738            view = DataView::new(Arc::new(combined_table));
1739
1740            // Apply deduplication if any UNION (not UNION ALL) operation was used
1741            if needs_deduplication {
1742                plan.begin_step(
1743                    StepType::Distinct,
1744                    "UNION deduplication - remove duplicate rows".to_string(),
1745                );
1746                plan.set_rows_in(view.row_count());
1747                plan.add_detail(format!("Input: {} rows", view.row_count()));
1748
1749                let distinct_start = Instant::now();
1750                view = self.apply_distinct(view)?;
1751
1752                plan.set_rows_out(view.row_count());
1753                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1754                plan.add_detail(format!(
1755                    "Deduplication time: {:.3}ms",
1756                    distinct_start.elapsed().as_secs_f64() * 1000.0
1757                ));
1758                plan.end_step();
1759            }
1760        }
1761
1762        Ok(view)
1763    }
1764
1765    /// Resolve column names to indices
1766    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1767        let mut indices = Vec::new();
1768        let table_columns = table.column_names();
1769
1770        for col_name in columns {
1771            let index = table_columns
1772                .iter()
1773                .position(|c| c.eq_ignore_ascii_case(col_name))
1774                .ok_or_else(|| {
1775                    let suggestion = self.find_similar_column(table, col_name);
1776                    match suggestion {
1777                        Some(similar) => anyhow::anyhow!(
1778                            "Column '{}' not found. Did you mean '{}'?",
1779                            col_name,
1780                            similar
1781                        ),
1782                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1783                    }
1784                })?;
1785            indices.push(index);
1786        }
1787
1788        Ok(indices)
1789    }
1790
1791    /// Apply SELECT items (columns and computed expressions) to create new view
1792    fn apply_select_items(
1793        &self,
1794        view: DataView,
1795        select_items: &[SelectItem],
1796        _statement: &SelectStatement,
1797        exec_context: Option<&ExecutionContext>,
1798        plan: &mut ExecutionPlanBuilder,
1799    ) -> Result<DataView> {
1800        debug!(
1801            "QueryEngine::apply_select_items - items: {:?}",
1802            select_items
1803        );
1804        debug!(
1805            "QueryEngine::apply_select_items - input view has {} rows",
1806            view.row_count()
1807        );
1808
1809        // Check if any select items contain window functions
1810        let has_window_functions = select_items.iter().any(|item| match item {
1811            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1812            _ => false,
1813        });
1814
1815        // Count window functions for detailed reporting
1816        let window_func_count: usize = select_items
1817            .iter()
1818            .filter(|item| match item {
1819                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1820                _ => false,
1821            })
1822            .count();
1823
1824        // Start timing for window function evaluation if present
1825        let window_start = if has_window_functions {
1826            debug!(
1827                "QueryEngine::apply_select_items - detected {} window functions",
1828                window_func_count
1829            );
1830
1831            // Extract window specs (Step 2: parallel path, not used yet)
1832            let window_specs = Self::extract_window_specs(select_items);
1833            debug!("Extracted {} window function specs", window_specs.len());
1834
1835            Some(Instant::now())
1836        } else {
1837            None
1838        };
1839
1840        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
1841        let has_unnest = select_items.iter().any(|item| match item {
1842            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1843            _ => false,
1844        });
1845
1846        if has_unnest {
1847            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1848            return self.apply_select_with_row_expansion(view, select_items);
1849        }
1850
1851        // Check if this is an aggregate query:
1852        // 1. At least one aggregate function exists
1853        // 2. All other items are either aggregates or constants (aggregate-compatible)
1854        let has_aggregates = select_items.iter().any(|item| match item {
1855            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1856            SelectItem::Column { .. } => false,
1857            SelectItem::Star { .. } => false,
1858            SelectItem::StarExclude { .. } => false,
1859        });
1860
1861        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1862            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1863            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
1864            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
1865            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
1866        });
1867
1868        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1869            // Special handling for aggregate queries with constants (no GROUP BY)
1870            // These should produce exactly one row
1871            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1872            return self.apply_aggregate_select(view, select_items);
1873        }
1874
1875        // Check if we need to create computed columns
1876        let has_computed_expressions = select_items
1877            .iter()
1878            .any(|item| matches!(item, SelectItem::Expression { .. }));
1879
1880        debug!(
1881            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1882            has_computed_expressions
1883        );
1884
1885        if !has_computed_expressions {
1886            // Simple case: only columns, use existing projection logic
1887            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1888            return Ok(view.with_columns(column_indices));
1889        }
1890
1891        // Complex case: we have computed expressions
1892        // IMPORTANT: We create a PROJECTED view, not a new table
1893        // This preserves the original DataTable reference
1894
1895        let source_table = view.source();
1896        let visible_rows = view.visible_row_indices();
1897
1898        // Create a temporary table just for the computed result view
1899        // But this table is only used for the current query result
1900        let mut computed_table = DataTable::new("query_result");
1901
1902        // First, expand any Star selectors to actual columns
1903        let mut expanded_items = Vec::new();
1904        for item in select_items {
1905            match item {
1906                SelectItem::Star { table_prefix, .. } => {
1907                    if let Some(prefix) = table_prefix {
1908                        // Scoped expansion: table.* expands only columns from that table
1909                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
1910                        for col in &source_table.columns {
1911                            if Self::column_matches_table(col, prefix) {
1912                                expanded_items.push(SelectItem::Column {
1913                                    column: ColumnRef::unquoted(col.name.clone()),
1914                                    leading_comments: vec![],
1915                                    trailing_comment: None,
1916                                });
1917                            }
1918                        }
1919                    } else {
1920                        // Unscoped expansion: * expands to all columns
1921                        debug!("QueryEngine::apply_select_items - expanding *");
1922                        for col_name in source_table.column_names() {
1923                            expanded_items.push(SelectItem::Column {
1924                                column: ColumnRef::unquoted(col_name.to_string()),
1925                                leading_comments: vec![],
1926                                trailing_comment: None,
1927                            });
1928                        }
1929                    }
1930                }
1931                _ => expanded_items.push(item.clone()),
1932            }
1933        }
1934
1935        // Add columns based on expanded SelectItems, handling duplicates
1936        let mut column_name_counts: std::collections::HashMap<String, usize> =
1937            std::collections::HashMap::new();
1938
1939        for item in &expanded_items {
1940            let base_name = match item {
1941                SelectItem::Column {
1942                    column: col_ref, ..
1943                } => col_ref.name.clone(),
1944                SelectItem::Expression { alias, .. } => alias.clone(),
1945                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1946                SelectItem::StarExclude { .. } => {
1947                    unreachable!("StarExclude should have been expanded")
1948                }
1949            };
1950
1951            // Check if this column name has been used before
1952            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1953            let column_name = if *count == 0 {
1954                // First occurrence, use the name as-is
1955                base_name.clone()
1956            } else {
1957                // Duplicate, append a suffix
1958                format!("{base_name}_{count}")
1959            };
1960            *count += 1;
1961
1962            computed_table.add_column(DataColumn::new(&column_name));
1963        }
1964
1965        // Check if batch evaluation can be used
1966        // Batch evaluation is the default but we need to check if all window functions
1967        // are standalone (not embedded in expressions)
1968        let can_use_batch = expanded_items.iter().all(|item| {
1969            match item {
1970                SelectItem::Expression { expr, .. } => {
1971                    // Only use batch evaluation if the expression IS a window function,
1972                    // not if it CONTAINS a window function
1973                    matches!(expr, SqlExpression::WindowFunction { .. })
1974                        || !Self::contains_window_function(expr)
1975                }
1976                _ => true, // Non-expressions are fine
1977            }
1978        });
1979
1980        // Batch evaluation is now the default mode for improved performance
1981        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
1982        let use_batch_evaluation = can_use_batch
1983            && std::env::var("SQL_CLI_BATCH_WINDOW")
1984                .map(|v| v != "0" && v.to_lowercase() != "false")
1985                .unwrap_or(true);
1986
1987        // Store window specs for batch evaluation if needed
1988        let batch_window_specs = if use_batch_evaluation && has_window_functions {
1989            debug!("BATCH window function evaluation flag is enabled");
1990            // Extract window specs before timing starts
1991            let specs = Self::extract_window_specs(&expanded_items);
1992            debug!(
1993                "Extracted {} window function specs for batch evaluation",
1994                specs.len()
1995            );
1996            Some(specs)
1997        } else {
1998            None
1999        };
2000
2001        // Calculate values for each row
2002        let mut evaluator =
2003            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2004
2005        // Populate table aliases from exec_context if available
2006        if let Some(exec_ctx) = exec_context {
2007            let aliases = exec_ctx.get_aliases();
2008            if !aliases.is_empty() {
2009                debug!(
2010                    "Applying {} aliases to evaluator: {:?}",
2011                    aliases.len(),
2012                    aliases
2013                );
2014                evaluator = evaluator.with_table_aliases(aliases);
2015            }
2016        }
2017
2018        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2019        // This avoids 50,000+ redundant context lookups
2020        if has_window_functions {
2021            let preload_start = Instant::now();
2022
2023            // Extract all unique WindowSpecs from SELECT items
2024            let mut window_specs = Vec::new();
2025            for item in &expanded_items {
2026                if let SelectItem::Expression { expr, .. } = item {
2027                    Self::collect_window_specs(expr, &mut window_specs);
2028                }
2029            }
2030
2031            // Pre-create all WindowContexts
2032            for spec in &window_specs {
2033                let _ = evaluator.get_or_create_window_context(spec);
2034            }
2035
2036            debug!(
2037                "Pre-created {} WindowContext(s) in {:.2}ms",
2038                window_specs.len(),
2039                preload_start.elapsed().as_secs_f64() * 1000.0
2040            );
2041        }
2042
2043        // Batch evaluation path for window functions
2044        if let Some(window_specs) = batch_window_specs {
2045            debug!("Starting batch window function evaluation");
2046            let batch_start = Instant::now();
2047
2048            // Initialize result table with all rows
2049            let mut batch_results: Vec<Vec<DataValue>> =
2050                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2051
2052            // Use the window specs we extracted earlier
2053            let detailed_window_specs = &window_specs;
2054
2055            // Group window specs by their WindowSpec for batch processing
2056            let mut specs_by_window: HashMap<
2057                u64,
2058                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2059            > = HashMap::new();
2060            for spec in detailed_window_specs {
2061                let hash = spec.spec.compute_hash();
2062                specs_by_window
2063                    .entry(hash)
2064                    .or_insert_with(Vec::new)
2065                    .push(spec);
2066            }
2067
2068            // Process each unique window specification
2069            for (_window_hash, specs) in specs_by_window {
2070                // Get the window context (already pre-created)
2071                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2072
2073                // Process each function using this window
2074                for spec in specs {
2075                    match spec.function_name.as_str() {
2076                        "LAG" => {
2077                            // Extract column and offset from arguments
2078                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2079                                let column_name = col_ref.name.as_str();
2080                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2081                                    spec.args.get(1)
2082                                {
2083                                    n.parse::<i64>().unwrap_or(1)
2084                                } else {
2085                                    1 // default offset
2086                                };
2087
2088                                let values = context.evaluate_lag_batch(
2089                                    visible_rows,
2090                                    column_name,
2091                                    offset,
2092                                )?;
2093
2094                                // Write results to the output column
2095                                for (row_idx, value) in values.into_iter().enumerate() {
2096                                    batch_results[row_idx][spec.output_column_index] = value;
2097                                }
2098                            }
2099                        }
2100                        "LEAD" => {
2101                            // Extract column and offset from arguments
2102                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2103                                let column_name = col_ref.name.as_str();
2104                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2105                                    spec.args.get(1)
2106                                {
2107                                    n.parse::<i64>().unwrap_or(1)
2108                                } else {
2109                                    1 // default offset
2110                                };
2111
2112                                let values = context.evaluate_lead_batch(
2113                                    visible_rows,
2114                                    column_name,
2115                                    offset,
2116                                )?;
2117
2118                                // Write results to the output column
2119                                for (row_idx, value) in values.into_iter().enumerate() {
2120                                    batch_results[row_idx][spec.output_column_index] = value;
2121                                }
2122                            }
2123                        }
2124                        "ROW_NUMBER" => {
2125                            let values = context.evaluate_row_number_batch(visible_rows)?;
2126
2127                            // Write results to the output column
2128                            for (row_idx, value) in values.into_iter().enumerate() {
2129                                batch_results[row_idx][spec.output_column_index] = value;
2130                            }
2131                        }
2132                        "RANK" => {
2133                            let values = context.evaluate_rank_batch(visible_rows)?;
2134
2135                            // Write results to the output column
2136                            for (row_idx, value) in values.into_iter().enumerate() {
2137                                batch_results[row_idx][spec.output_column_index] = value;
2138                            }
2139                        }
2140                        "DENSE_RANK" => {
2141                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2142
2143                            // Write results to the output column
2144                            for (row_idx, value) in values.into_iter().enumerate() {
2145                                batch_results[row_idx][spec.output_column_index] = value;
2146                            }
2147                        }
2148                        "SUM" => {
2149                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2150                                let column_name = col_ref.name.as_str();
2151                                let values =
2152                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2153
2154                                for (row_idx, value) in values.into_iter().enumerate() {
2155                                    batch_results[row_idx][spec.output_column_index] = value;
2156                                }
2157                            }
2158                        }
2159                        "AVG" => {
2160                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2161                                let column_name = col_ref.name.as_str();
2162                                let values =
2163                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2164
2165                                for (row_idx, value) in values.into_iter().enumerate() {
2166                                    batch_results[row_idx][spec.output_column_index] = value;
2167                                }
2168                            }
2169                        }
2170                        "MIN" => {
2171                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2172                                let column_name = col_ref.name.as_str();
2173                                let values =
2174                                    context.evaluate_min_batch(visible_rows, column_name)?;
2175
2176                                for (row_idx, value) in values.into_iter().enumerate() {
2177                                    batch_results[row_idx][spec.output_column_index] = value;
2178                                }
2179                            }
2180                        }
2181                        "MAX" => {
2182                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2183                                let column_name = col_ref.name.as_str();
2184                                let values =
2185                                    context.evaluate_max_batch(visible_rows, column_name)?;
2186
2187                                for (row_idx, value) in values.into_iter().enumerate() {
2188                                    batch_results[row_idx][spec.output_column_index] = value;
2189                                }
2190                            }
2191                        }
2192                        "COUNT" => {
2193                            // COUNT can be COUNT(*) or COUNT(column)
2194                            let column_name = match spec.args.get(0) {
2195                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2196                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2197                                _ => None,
2198                            };
2199
2200                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2201
2202                            for (row_idx, value) in values.into_iter().enumerate() {
2203                                batch_results[row_idx][spec.output_column_index] = value;
2204                            }
2205                        }
2206                        "FIRST_VALUE" => {
2207                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2208                                let column_name = col_ref.name.as_str();
2209                                let values = context
2210                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2211
2212                                for (row_idx, value) in values.into_iter().enumerate() {
2213                                    batch_results[row_idx][spec.output_column_index] = value;
2214                                }
2215                            }
2216                        }
2217                        "LAST_VALUE" => {
2218                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2219                                let column_name = col_ref.name.as_str();
2220                                let values =
2221                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2222
2223                                for (row_idx, value) in values.into_iter().enumerate() {
2224                                    batch_results[row_idx][spec.output_column_index] = value;
2225                                }
2226                            }
2227                        }
2228                        _ => {
2229                            // Fall back to per-row evaluation for unsupported functions
2230                            debug!(
2231                                "Window function {} not supported in batch mode, using per-row",
2232                                spec.function_name
2233                            );
2234                        }
2235                    }
2236                }
2237            }
2238
2239            // Now evaluate non-window columns
2240            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2241                for (col_idx, item) in expanded_items.iter().enumerate() {
2242                    // Skip if this column was already filled by a window function
2243                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2244                        continue;
2245                    }
2246
2247                    let value = match item {
2248                        SelectItem::Column {
2249                            column: col_ref, ..
2250                        } => {
2251                            match evaluator
2252                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2253                            {
2254                                Ok(val) => val,
2255                                Err(e) => {
2256                                    return Err(anyhow!(
2257                                        "Failed to evaluate column {}: {}",
2258                                        col_ref.to_sql(),
2259                                        e
2260                                    ));
2261                                }
2262                            }
2263                        }
2264                        SelectItem::Expression { expr, .. } => {
2265                            // For batch evaluation, we need to handle expressions differently
2266                            // If this is just a window function by itself, we already computed it
2267                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2268                                // Pure window function - already handled in batch evaluation
2269                                continue;
2270                            }
2271                            // For expressions containing window functions or regular expressions,
2272                            // evaluate them normally
2273                            evaluator.evaluate(&expr, source_row_idx)?
2274                        }
2275                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2276                        SelectItem::StarExclude { .. } => {
2277                            unreachable!("StarExclude should have been expanded")
2278                        }
2279                    };
2280                    batch_results[result_row_idx][col_idx] = value;
2281                }
2282            }
2283
2284            // Add all rows to the table
2285            for row_values in batch_results {
2286                computed_table
2287                    .add_row(DataRow::new(row_values))
2288                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2289            }
2290
2291            debug!(
2292                "Batch window evaluation completed in {:.3}ms",
2293                batch_start.elapsed().as_secs_f64() * 1000.0
2294            );
2295        } else {
2296            // Original per-row evaluation path
2297            for &row_idx in visible_rows {
2298                let mut row_values = Vec::new();
2299
2300                for item in &expanded_items {
2301                    let value = match item {
2302                        SelectItem::Column {
2303                            column: col_ref, ..
2304                        } => {
2305                            // Use evaluator for column resolution (handles aliases properly)
2306                            match evaluator
2307                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2308                            {
2309                                Ok(val) => val,
2310                                Err(e) => {
2311                                    return Err(anyhow!(
2312                                        "Failed to evaluate column {}: {}",
2313                                        col_ref.to_sql(),
2314                                        e
2315                                    ));
2316                                }
2317                            }
2318                        }
2319                        SelectItem::Expression { expr, .. } => {
2320                            // Computed expression
2321                            evaluator.evaluate(&expr, row_idx)?
2322                        }
2323                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2324                        SelectItem::StarExclude { .. } => {
2325                            unreachable!("StarExclude should have been expanded")
2326                        }
2327                    };
2328                    row_values.push(value);
2329                }
2330
2331                computed_table
2332                    .add_row(DataRow::new(row_values))
2333                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2334            }
2335        }
2336
2337        // Log window function timing if applicable
2338        if let Some(start) = window_start {
2339            let window_duration = start.elapsed();
2340            info!(
2341                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2342                window_duration.as_secs_f64() * 1000.0,
2343                visible_rows.len(),
2344                window_func_count
2345            );
2346
2347            // Add to execution plan
2348            plan.begin_step(
2349                StepType::WindowFunction,
2350                format!("Evaluate {} window function(s)", window_func_count),
2351            );
2352            plan.set_rows_in(visible_rows.len());
2353            plan.set_rows_out(visible_rows.len());
2354            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2355            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2356            plan.add_detail(format!(
2357                "Evaluation time: {:.3}ms",
2358                window_duration.as_secs_f64() * 1000.0
2359            ));
2360            plan.end_step();
2361        }
2362
2363        // Return a view of the computed result
2364        // This is a temporary view for this query only
2365        Ok(DataView::new(Arc::new(computed_table)))
2366    }
2367
2368    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2369    fn apply_select_with_row_expansion(
2370        &self,
2371        view: DataView,
2372        select_items: &[SelectItem],
2373    ) -> Result<DataView> {
2374        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2375
2376        let source_table = view.source();
2377        let visible_rows = view.visible_row_indices();
2378        let expander_registry = RowExpanderRegistry::new();
2379
2380        // Create result table
2381        let mut result_table = DataTable::new("unnest_result");
2382
2383        // Expand * to columns and set up result columns
2384        let mut expanded_items = Vec::new();
2385        for item in select_items {
2386            match item {
2387                SelectItem::Star { table_prefix, .. } => {
2388                    if let Some(prefix) = table_prefix {
2389                        // Scoped expansion: table.* expands only columns from that table
2390                        debug!(
2391                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2392                            prefix
2393                        );
2394                        for col in &source_table.columns {
2395                            if Self::column_matches_table(col, prefix) {
2396                                expanded_items.push(SelectItem::Column {
2397                                    column: ColumnRef::unquoted(col.name.clone()),
2398                                    leading_comments: vec![],
2399                                    trailing_comment: None,
2400                                });
2401                            }
2402                        }
2403                    } else {
2404                        // Unscoped expansion: * expands to all columns
2405                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2406                        for col_name in source_table.column_names() {
2407                            expanded_items.push(SelectItem::Column {
2408                                column: ColumnRef::unquoted(col_name.to_string()),
2409                                leading_comments: vec![],
2410                                trailing_comment: None,
2411                            });
2412                        }
2413                    }
2414                }
2415                _ => expanded_items.push(item.clone()),
2416            }
2417        }
2418
2419        // Add columns to result table
2420        for item in &expanded_items {
2421            let column_name = match item {
2422                SelectItem::Column {
2423                    column: col_ref, ..
2424                } => col_ref.name.clone(),
2425                SelectItem::Expression { alias, .. } => alias.clone(),
2426                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2427                SelectItem::StarExclude { .. } => {
2428                    unreachable!("StarExclude should have been expanded")
2429                }
2430            };
2431            result_table.add_column(DataColumn::new(&column_name));
2432        }
2433
2434        // Process each input row
2435        let mut evaluator =
2436            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2437
2438        for &row_idx in visible_rows {
2439            // First pass: identify UNNEST expressions and collect their expansion arrays
2440            let mut unnest_expansions = Vec::new();
2441            let mut unnest_indices = Vec::new();
2442
2443            for (col_idx, item) in expanded_items.iter().enumerate() {
2444                if let SelectItem::Expression { expr, .. } = item {
2445                    if let Some(expansion_result) = self.try_expand_unnest(
2446                        &expr,
2447                        source_table,
2448                        row_idx,
2449                        &mut evaluator,
2450                        &expander_registry,
2451                    )? {
2452                        unnest_expansions.push(expansion_result);
2453                        unnest_indices.push(col_idx);
2454                    }
2455                }
2456            }
2457
2458            // Determine how many output rows to generate
2459            let expansion_count = if unnest_expansions.is_empty() {
2460                1 // No UNNEST, just one row
2461            } else {
2462                unnest_expansions
2463                    .iter()
2464                    .map(|exp| exp.row_count())
2465                    .max()
2466                    .unwrap_or(1)
2467            };
2468
2469            // Generate output rows
2470            for output_idx in 0..expansion_count {
2471                let mut row_values = Vec::new();
2472
2473                for (col_idx, item) in expanded_items.iter().enumerate() {
2474                    // Check if this column is an UNNEST column
2475                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2476
2477                    let value = if let Some(unnest_idx) = unnest_position {
2478                        // Get value from expansion array (or NULL if exhausted)
2479                        let expansion = &unnest_expansions[unnest_idx];
2480                        expansion
2481                            .values
2482                            .get(output_idx)
2483                            .cloned()
2484                            .unwrap_or(DataValue::Null)
2485                    } else {
2486                        // Regular column or non-UNNEST expression - replicate from input
2487                        match item {
2488                            SelectItem::Column {
2489                                column: col_ref, ..
2490                            } => {
2491                                let col_idx =
2492                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2493                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2494                                    )?;
2495                                let row = source_table
2496                                    .get_row(row_idx)
2497                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2498                                row.get(col_idx)
2499                                    .ok_or_else(|| {
2500                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2501                                    })?
2502                                    .clone()
2503                            }
2504                            SelectItem::Expression { expr, .. } => {
2505                                // Non-UNNEST expression - evaluate once and replicate
2506                                evaluator.evaluate(&expr, row_idx)?
2507                            }
2508                            SelectItem::Star { .. } => unreachable!(),
2509                            SelectItem::StarExclude { .. } => {
2510                                unreachable!("StarExclude should have been expanded")
2511                            }
2512                        }
2513                    };
2514
2515                    row_values.push(value);
2516                }
2517
2518                result_table
2519                    .add_row(DataRow::new(row_values))
2520                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2521            }
2522        }
2523
2524        debug!(
2525            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2526            visible_rows.len(),
2527            result_table.row_count()
2528        );
2529
2530        Ok(DataView::new(Arc::new(result_table)))
2531    }
2532
2533    /// Try to expand an expression if it's an UNNEST call
2534    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2535    fn try_expand_unnest(
2536        &self,
2537        expr: &SqlExpression,
2538        _source_table: &DataTable,
2539        row_idx: usize,
2540        evaluator: &mut ArithmeticEvaluator,
2541        expander_registry: &RowExpanderRegistry,
2542    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2543        // Check for UNNEST variant (direct syntax)
2544        if let SqlExpression::Unnest { column, delimiter } = expr {
2545            // Evaluate the column expression
2546            let column_value = evaluator.evaluate(column, row_idx)?;
2547
2548            // Delimiter is already a string literal
2549            let delimiter_value = DataValue::String(delimiter.clone());
2550
2551            // Get the UNNEST expander
2552            let expander = expander_registry
2553                .get("UNNEST")
2554                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2555
2556            // Expand the value
2557            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2558            return Ok(Some(expansion));
2559        }
2560
2561        // Also check for FunctionCall form (for compatibility)
2562        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2563            if name.to_uppercase() == "UNNEST" {
2564                // UNNEST(column, delimiter)
2565                if args.len() != 2 {
2566                    return Err(anyhow::anyhow!(
2567                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2568                    ));
2569                }
2570
2571                // Evaluate the column expression (first arg)
2572                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2573
2574                // Evaluate the delimiter expression (second arg)
2575                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2576
2577                // Get the UNNEST expander
2578                let expander = expander_registry
2579                    .get("UNNEST")
2580                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2581
2582                // Expand the value
2583                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2584                return Ok(Some(expansion));
2585            }
2586        }
2587
2588        Ok(None)
2589    }
2590
2591    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2592    fn apply_aggregate_select(
2593        &self,
2594        view: DataView,
2595        select_items: &[SelectItem],
2596    ) -> Result<DataView> {
2597        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2598
2599        let source_table = view.source();
2600        let mut result_table = DataTable::new("aggregate_result");
2601
2602        // Add columns for each select item
2603        for item in select_items {
2604            let column_name = match item {
2605                SelectItem::Expression { alias, .. } => alias.clone(),
2606                _ => unreachable!("Should only have expressions in aggregate-only query"),
2607            };
2608            result_table.add_column(DataColumn::new(&column_name));
2609        }
2610
2611        // Create evaluator with visible rows from the view (for filtered aggregates)
2612        let visible_rows = view.visible_row_indices().to_vec();
2613        let mut evaluator =
2614            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2615                .with_visible_rows(visible_rows);
2616
2617        // Evaluate each aggregate expression once (they handle all rows internally)
2618        let mut row_values = Vec::new();
2619        for item in select_items {
2620            match item {
2621                SelectItem::Expression { expr, .. } => {
2622                    // The evaluator will handle aggregates over all rows
2623                    // We pass row_index=0 but aggregates ignore it and process all rows
2624                    let value = evaluator.evaluate(expr, 0)?;
2625                    row_values.push(value);
2626                }
2627                _ => unreachable!("Should only have expressions in aggregate-only query"),
2628            }
2629        }
2630
2631        // Add the single result row
2632        result_table
2633            .add_row(DataRow::new(row_values))
2634            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2635
2636        Ok(DataView::new(Arc::new(result_table)))
2637    }
2638
2639    /// Check if a column belongs to a specific table based on source_table or qualified_name
2640    ///
2641    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2642    /// to filter which columns should be included.
2643    ///
2644    /// # Arguments
2645    /// * `col` - The column to check
2646    /// * `table_name` - The table name or alias to match against
2647    ///
2648    /// # Returns
2649    /// `true` if the column belongs to the specified table
2650    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2651        // First, check the source_table field
2652        if let Some(ref source) = col.source_table {
2653            // Direct match or matches with schema qualification
2654            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2655                return true;
2656            }
2657        }
2658
2659        // Second, check the qualified_name field
2660        if let Some(ref qualified) = col.qualified_name {
2661            // Check if qualified name starts with "table_name."
2662            if qualified.starts_with(&format!("{}.", table_name)) {
2663                return true;
2664            }
2665        }
2666
2667        false
2668    }
2669
2670    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2671    fn resolve_select_columns(
2672        &self,
2673        table: &DataTable,
2674        select_items: &[SelectItem],
2675    ) -> Result<Vec<usize>> {
2676        let mut indices = Vec::new();
2677        let table_columns = table.column_names();
2678
2679        for item in select_items {
2680            match item {
2681                SelectItem::Column {
2682                    column: col_ref, ..
2683                } => {
2684                    // Check if this has a table prefix
2685                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2686                        // For qualified references, ONLY try qualified lookup - no fallback
2687                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2688                        table.find_column_by_qualified_name(&qualified_name)
2689                            .ok_or_else(|| {
2690                                // Check if any columns have qualified names for better error message
2691                                let has_qualified = table.columns.iter()
2692                                    .any(|c| c.qualified_name.is_some());
2693                                if !has_qualified {
2694                                    anyhow::anyhow!(
2695                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2696                                        qualified_name, table_prefix
2697                                    )
2698                                } else {
2699                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2700                                }
2701                            })?
2702                    } else {
2703                        // Simple column name lookup
2704                        table_columns
2705                            .iter()
2706                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2707                            .ok_or_else(|| {
2708                                let suggestion = self.find_similar_column(table, &col_ref.name);
2709                                match suggestion {
2710                                    Some(similar) => anyhow::anyhow!(
2711                                        "Column '{}' not found. Did you mean '{}'?",
2712                                        col_ref.name,
2713                                        similar
2714                                    ),
2715                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2716                                }
2717                            })?
2718                    };
2719                    indices.push(index);
2720                }
2721                SelectItem::Star { table_prefix, .. } => {
2722                    if let Some(prefix) = table_prefix {
2723                        // Scoped expansion: table.* expands only columns from that table
2724                        for (i, col) in table.columns.iter().enumerate() {
2725                            if Self::column_matches_table(col, prefix) {
2726                                indices.push(i);
2727                            }
2728                        }
2729                    } else {
2730                        // Unscoped expansion: * expands to all column indices
2731                        for i in 0..table_columns.len() {
2732                            indices.push(i);
2733                        }
2734                    }
2735                }
2736                SelectItem::StarExclude {
2737                    table_prefix,
2738                    excluded_columns,
2739                    ..
2740                } => {
2741                    // Expand all columns (with optional table prefix), then exclude specified ones
2742                    if let Some(prefix) = table_prefix {
2743                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2744                        for (i, col) in table.columns.iter().enumerate() {
2745                            if Self::column_matches_table(col, prefix)
2746                                && !excluded_columns.contains(&col.name)
2747                            {
2748                                indices.push(i);
2749                            }
2750                        }
2751                    } else {
2752                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2753                        for (i, col_name) in table_columns.iter().enumerate() {
2754                            if !excluded_columns
2755                                .iter()
2756                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2757                            {
2758                                indices.push(i);
2759                            }
2760                        }
2761                    }
2762                }
2763                SelectItem::Expression { .. } => {
2764                    return Err(anyhow::anyhow!(
2765                        "Computed expressions require new table creation"
2766                    ));
2767                }
2768            }
2769        }
2770
2771        Ok(indices)
2772    }
2773
2774    /// Apply DISTINCT to remove duplicate rows
2775    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2776        use std::collections::HashSet;
2777
2778        let source = view.source();
2779        let visible_cols = view.visible_column_indices();
2780        let visible_rows = view.visible_row_indices();
2781
2782        // Build a set to track unique rows
2783        let mut seen_rows = HashSet::new();
2784        let mut unique_row_indices = Vec::new();
2785
2786        for &row_idx in visible_rows {
2787            // Build a key representing this row's visible column values
2788            let mut row_key = Vec::new();
2789            for &col_idx in visible_cols {
2790                let value = source
2791                    .get_value(row_idx, col_idx)
2792                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2793                // Convert value to a hashable representation
2794                row_key.push(format!("{:?}", value));
2795            }
2796
2797            // Check if we've seen this row before
2798            if seen_rows.insert(row_key) {
2799                // First time seeing this row combination
2800                unique_row_indices.push(row_idx);
2801            }
2802        }
2803
2804        // Create a new view with only unique rows
2805        Ok(view.with_rows(unique_row_indices))
2806    }
2807
2808    /// Apply multi-column ORDER BY sorting to the view
2809    fn apply_multi_order_by(
2810        &self,
2811        view: DataView,
2812        order_by_columns: &[OrderByItem],
2813    ) -> Result<DataView> {
2814        self.apply_multi_order_by_with_context(view, order_by_columns, None)
2815    }
2816
2817    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
2818    fn apply_multi_order_by_with_context(
2819        &self,
2820        mut view: DataView,
2821        order_by_columns: &[OrderByItem],
2822        _exec_context: Option<&ExecutionContext>,
2823    ) -> Result<DataView> {
2824        // Build list of (source_column_index, ascending) tuples
2825        let mut sort_columns = Vec::new();
2826
2827        for order_col in order_by_columns {
2828            // Extract column name from expression (currently only supports simple columns)
2829            let column_name = match &order_col.expr {
2830                SqlExpression::Column(col_ref) => col_ref.name.clone(),
2831                _ => {
2832                    // TODO: Support expression evaluation in ORDER BY
2833                    return Err(anyhow!(
2834                        "ORDER BY expressions not yet supported - only simple columns allowed"
2835                    ));
2836                }
2837            };
2838
2839            // Try to find the column index, handling qualified column names (table.column)
2840            let col_index = if column_name.contains('.') {
2841                // Qualified column name - extract unqualified part
2842                if let Some(dot_pos) = column_name.rfind('.') {
2843                    let col_name = &column_name[dot_pos + 1..];
2844
2845                    // After SELECT processing, columns are unqualified
2846                    // So just use the column name part
2847                    debug!(
2848                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
2849                        col_name, column_name
2850                    );
2851                    view.source().get_column_index(col_name)
2852                } else {
2853                    view.source().get_column_index(&column_name)
2854                }
2855            } else {
2856                // Simple column name
2857                view.source().get_column_index(&column_name)
2858            }
2859            .ok_or_else(|| {
2860                // If not found, provide helpful error with suggestions
2861                let suggestion = self.find_similar_column(view.source(), &column_name);
2862                match suggestion {
2863                    Some(similar) => anyhow::anyhow!(
2864                        "Column '{}' not found. Did you mean '{}'?",
2865                        column_name,
2866                        similar
2867                    ),
2868                    None => {
2869                        // Also list available columns for debugging
2870                        let available_cols = view.source().column_names().join(", ");
2871                        anyhow::anyhow!(
2872                            "Column '{}' not found. Available columns: {}",
2873                            column_name,
2874                            available_cols
2875                        )
2876                    }
2877                }
2878            })?;
2879
2880            let ascending = matches!(order_col.direction, SortDirection::Asc);
2881            sort_columns.push((col_index, ascending));
2882        }
2883
2884        // Apply multi-column sorting
2885        view.apply_multi_sort(&sort_columns)?;
2886        Ok(view)
2887    }
2888
2889    /// Apply GROUP BY to the view with optional HAVING clause
2890    fn apply_group_by(
2891        &self,
2892        view: DataView,
2893        group_by_exprs: &[SqlExpression],
2894        select_items: &[SelectItem],
2895        having: Option<&SqlExpression>,
2896        plan: &mut ExecutionPlanBuilder,
2897    ) -> Result<DataView> {
2898        // Use the new expression-based GROUP BY implementation
2899        let (result_view, phase_info) = self.apply_group_by_expressions(
2900            view,
2901            group_by_exprs,
2902            select_items,
2903            having,
2904            self.case_insensitive,
2905            self.date_notation.clone(),
2906        )?;
2907
2908        // Add detailed phase information to the execution plan
2909        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2910        plan.add_detail(format!(
2911            "Phase 1 - Group Building: {:.3}ms",
2912            phase_info.phase2_key_building.as_secs_f64() * 1000.0
2913        ));
2914        plan.add_detail(format!(
2915            "  • Processing {} rows into {} groups",
2916            phase_info.total_rows, phase_info.num_groups
2917        ));
2918        plan.add_detail(format!(
2919            "Phase 2 - Aggregation: {:.3}ms",
2920            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2921        ));
2922        if phase_info.phase4_having_evaluation > Duration::ZERO {
2923            plan.add_detail(format!(
2924                "Phase 3 - HAVING Filter: {:.3}ms",
2925                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2926            ));
2927            plan.add_detail(format!(
2928                "  • Filtered {} groups",
2929                phase_info.groups_filtered_by_having
2930            ));
2931        }
2932        plan.add_detail(format!(
2933            "Total GROUP BY time: {:.3}ms",
2934            phase_info.total_time.as_secs_f64() * 1000.0
2935        ));
2936
2937        Ok(result_view)
2938    }
2939
2940    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
2941    /// This helps pre-size hash tables for better performance
2942    pub fn estimate_group_cardinality(
2943        &self,
2944        view: &DataView,
2945        group_by_exprs: &[SqlExpression],
2946    ) -> usize {
2947        // If we have few rows, just return the row count as upper bound
2948        let row_count = view.get_visible_rows().len();
2949        if row_count <= 100 {
2950            return row_count;
2951        }
2952
2953        // Sample first 1000 rows or 10% of data, whichever is smaller
2954        let sample_size = min(1000, row_count / 10).max(100);
2955        let mut seen = FxHashSet::default();
2956
2957        let visible_rows = view.get_visible_rows();
2958        for (i, &row_idx) in visible_rows.iter().enumerate() {
2959            if i >= sample_size {
2960                break;
2961            }
2962
2963            // Evaluate GROUP BY expressions for this row
2964            let mut key_values = Vec::new();
2965            for expr in group_by_exprs {
2966                let mut evaluator = ArithmeticEvaluator::new(view.source());
2967                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2968                key_values.push(value);
2969            }
2970
2971            seen.insert(key_values);
2972        }
2973
2974        // Estimate total cardinality based on sample
2975        let sample_cardinality = seen.len();
2976        let estimated = (sample_cardinality * row_count) / sample_size;
2977
2978        // Cap at row count and ensure minimum of sample cardinality
2979        estimated.min(row_count).max(sample_cardinality)
2980    }
2981}
2982
2983#[cfg(test)]
2984mod tests {
2985    use super::*;
2986    use crate::data::datatable::{DataColumn, DataRow, DataValue};
2987
2988    fn create_test_table() -> Arc<DataTable> {
2989        let mut table = DataTable::new("test");
2990
2991        // Add columns
2992        table.add_column(DataColumn::new("id"));
2993        table.add_column(DataColumn::new("name"));
2994        table.add_column(DataColumn::new("age"));
2995
2996        // Add rows
2997        table
2998            .add_row(DataRow::new(vec![
2999                DataValue::Integer(1),
3000                DataValue::String("Alice".to_string()),
3001                DataValue::Integer(30),
3002            ]))
3003            .unwrap();
3004
3005        table
3006            .add_row(DataRow::new(vec![
3007                DataValue::Integer(2),
3008                DataValue::String("Bob".to_string()),
3009                DataValue::Integer(25),
3010            ]))
3011            .unwrap();
3012
3013        table
3014            .add_row(DataRow::new(vec![
3015                DataValue::Integer(3),
3016                DataValue::String("Charlie".to_string()),
3017                DataValue::Integer(35),
3018            ]))
3019            .unwrap();
3020
3021        Arc::new(table)
3022    }
3023
3024    #[test]
3025    fn test_select_all() {
3026        let table = create_test_table();
3027        let engine = QueryEngine::new();
3028
3029        let view = engine
3030            .execute(table.clone(), "SELECT * FROM users")
3031            .unwrap();
3032        assert_eq!(view.row_count(), 3);
3033        assert_eq!(view.column_count(), 3);
3034    }
3035
3036    #[test]
3037    fn test_select_columns() {
3038        let table = create_test_table();
3039        let engine = QueryEngine::new();
3040
3041        let view = engine
3042            .execute(table.clone(), "SELECT name, age FROM users")
3043            .unwrap();
3044        assert_eq!(view.row_count(), 3);
3045        assert_eq!(view.column_count(), 2);
3046    }
3047
3048    #[test]
3049    fn test_select_with_limit() {
3050        let table = create_test_table();
3051        let engine = QueryEngine::new();
3052
3053        let view = engine
3054            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3055            .unwrap();
3056        assert_eq!(view.row_count(), 2);
3057    }
3058
3059    #[test]
3060    fn test_type_coercion_contains() {
3061        // Initialize tracing for debug output
3062        let _ = tracing_subscriber::fmt()
3063            .with_max_level(tracing::Level::DEBUG)
3064            .try_init();
3065
3066        let mut table = DataTable::new("test");
3067        table.add_column(DataColumn::new("id"));
3068        table.add_column(DataColumn::new("status"));
3069        table.add_column(DataColumn::new("price"));
3070
3071        // Add test data with mixed types
3072        table
3073            .add_row(DataRow::new(vec![
3074                DataValue::Integer(1),
3075                DataValue::String("Pending".to_string()),
3076                DataValue::Float(99.99),
3077            ]))
3078            .unwrap();
3079
3080        table
3081            .add_row(DataRow::new(vec![
3082                DataValue::Integer(2),
3083                DataValue::String("Confirmed".to_string()),
3084                DataValue::Float(150.50),
3085            ]))
3086            .unwrap();
3087
3088        table
3089            .add_row(DataRow::new(vec![
3090                DataValue::Integer(3),
3091                DataValue::String("Pending".to_string()),
3092                DataValue::Float(75.00),
3093            ]))
3094            .unwrap();
3095
3096        let table = Arc::new(table);
3097        let engine = QueryEngine::new();
3098
3099        println!("\n=== Testing WHERE clause with Contains ===");
3100        println!("Table has {} rows", table.row_count());
3101        for i in 0..table.row_count() {
3102            let status = table.get_value(i, 1);
3103            println!("Row {i}: status = {status:?}");
3104        }
3105
3106        // Test 1: Basic string contains (should work)
3107        println!("\n--- Test 1: status.Contains('pend') ---");
3108        let result = engine.execute(
3109            table.clone(),
3110            "SELECT * FROM test WHERE status.Contains('pend')",
3111        );
3112        match result {
3113            Ok(view) => {
3114                println!("SUCCESS: Found {} matching rows", view.row_count());
3115                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3116            }
3117            Err(e) => {
3118                panic!("Query failed: {e}");
3119            }
3120        }
3121
3122        // Test 2: Numeric contains (should work with type coercion)
3123        println!("\n--- Test 2: price.Contains('9') ---");
3124        let result = engine.execute(
3125            table.clone(),
3126            "SELECT * FROM test WHERE price.Contains('9')",
3127        );
3128        match result {
3129            Ok(view) => {
3130                println!(
3131                    "SUCCESS: Found {} matching rows with price containing '9'",
3132                    view.row_count()
3133                );
3134                // Should find 99.99 row
3135                assert!(view.row_count() >= 1);
3136            }
3137            Err(e) => {
3138                panic!("Numeric coercion query failed: {e}");
3139            }
3140        }
3141
3142        println!("\n=== All tests passed! ===");
3143    }
3144
3145    #[test]
3146    fn test_not_in_clause() {
3147        // Initialize tracing for debug output
3148        let _ = tracing_subscriber::fmt()
3149            .with_max_level(tracing::Level::DEBUG)
3150            .try_init();
3151
3152        let mut table = DataTable::new("test");
3153        table.add_column(DataColumn::new("id"));
3154        table.add_column(DataColumn::new("country"));
3155
3156        // Add test data
3157        table
3158            .add_row(DataRow::new(vec![
3159                DataValue::Integer(1),
3160                DataValue::String("CA".to_string()),
3161            ]))
3162            .unwrap();
3163
3164        table
3165            .add_row(DataRow::new(vec![
3166                DataValue::Integer(2),
3167                DataValue::String("US".to_string()),
3168            ]))
3169            .unwrap();
3170
3171        table
3172            .add_row(DataRow::new(vec![
3173                DataValue::Integer(3),
3174                DataValue::String("UK".to_string()),
3175            ]))
3176            .unwrap();
3177
3178        let table = Arc::new(table);
3179        let engine = QueryEngine::new();
3180
3181        println!("\n=== Testing NOT IN clause ===");
3182        println!("Table has {} rows", table.row_count());
3183        for i in 0..table.row_count() {
3184            let country = table.get_value(i, 1);
3185            println!("Row {i}: country = {country:?}");
3186        }
3187
3188        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3189        println!("\n--- Test: country NOT IN ('CA') ---");
3190        let result = engine.execute(
3191            table.clone(),
3192            "SELECT * FROM test WHERE country NOT IN ('CA')",
3193        );
3194        match result {
3195            Ok(view) => {
3196                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3197                assert_eq!(view.row_count(), 2); // Should find US and UK
3198            }
3199            Err(e) => {
3200                panic!("NOT IN query failed: {e}");
3201            }
3202        }
3203
3204        println!("\n=== NOT IN test complete! ===");
3205    }
3206
3207    #[test]
3208    fn test_case_insensitive_in_and_not_in() {
3209        // Initialize tracing for debug output
3210        let _ = tracing_subscriber::fmt()
3211            .with_max_level(tracing::Level::DEBUG)
3212            .try_init();
3213
3214        let mut table = DataTable::new("test");
3215        table.add_column(DataColumn::new("id"));
3216        table.add_column(DataColumn::new("country"));
3217
3218        // Add test data with mixed case
3219        table
3220            .add_row(DataRow::new(vec![
3221                DataValue::Integer(1),
3222                DataValue::String("CA".to_string()), // uppercase
3223            ]))
3224            .unwrap();
3225
3226        table
3227            .add_row(DataRow::new(vec![
3228                DataValue::Integer(2),
3229                DataValue::String("us".to_string()), // lowercase
3230            ]))
3231            .unwrap();
3232
3233        table
3234            .add_row(DataRow::new(vec![
3235                DataValue::Integer(3),
3236                DataValue::String("UK".to_string()), // uppercase
3237            ]))
3238            .unwrap();
3239
3240        let table = Arc::new(table);
3241
3242        println!("\n=== Testing Case-Insensitive IN clause ===");
3243        println!("Table has {} rows", table.row_count());
3244        for i in 0..table.row_count() {
3245            let country = table.get_value(i, 1);
3246            println!("Row {i}: country = {country:?}");
3247        }
3248
3249        // Test case-insensitive IN - should match 'CA' with 'ca'
3250        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3251        let engine = QueryEngine::with_case_insensitive(true);
3252        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3253        match result {
3254            Ok(view) => {
3255                println!(
3256                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3257                    view.row_count()
3258                );
3259                assert_eq!(view.row_count(), 1); // Should find CA row
3260            }
3261            Err(e) => {
3262                panic!("Case-insensitive IN query failed: {e}");
3263            }
3264        }
3265
3266        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3267        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3268        let result = engine.execute(
3269            table.clone(),
3270            "SELECT * FROM test WHERE country NOT IN ('ca')",
3271        );
3272        match result {
3273            Ok(view) => {
3274                println!(
3275                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3276                    view.row_count()
3277                );
3278                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3279            }
3280            Err(e) => {
3281                panic!("Case-insensitive NOT IN query failed: {e}");
3282            }
3283        }
3284
3285        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3286        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3287        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3288        let result = engine_case_sensitive
3289            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3290        match result {
3291            Ok(view) => {
3292                println!(
3293                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3294                    view.row_count()
3295                );
3296                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3297            }
3298            Err(e) => {
3299                panic!("Case-sensitive IN query failed: {e}");
3300            }
3301        }
3302
3303        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3304    }
3305
3306    #[test]
3307    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3308    fn test_parentheses_in_where_clause() {
3309        // Initialize tracing for debug output
3310        let _ = tracing_subscriber::fmt()
3311            .with_max_level(tracing::Level::DEBUG)
3312            .try_init();
3313
3314        let mut table = DataTable::new("test");
3315        table.add_column(DataColumn::new("id"));
3316        table.add_column(DataColumn::new("status"));
3317        table.add_column(DataColumn::new("priority"));
3318
3319        // Add test data
3320        table
3321            .add_row(DataRow::new(vec![
3322                DataValue::Integer(1),
3323                DataValue::String("Pending".to_string()),
3324                DataValue::String("High".to_string()),
3325            ]))
3326            .unwrap();
3327
3328        table
3329            .add_row(DataRow::new(vec![
3330                DataValue::Integer(2),
3331                DataValue::String("Complete".to_string()),
3332                DataValue::String("High".to_string()),
3333            ]))
3334            .unwrap();
3335
3336        table
3337            .add_row(DataRow::new(vec![
3338                DataValue::Integer(3),
3339                DataValue::String("Pending".to_string()),
3340                DataValue::String("Low".to_string()),
3341            ]))
3342            .unwrap();
3343
3344        table
3345            .add_row(DataRow::new(vec![
3346                DataValue::Integer(4),
3347                DataValue::String("Complete".to_string()),
3348                DataValue::String("Low".to_string()),
3349            ]))
3350            .unwrap();
3351
3352        let table = Arc::new(table);
3353        let engine = QueryEngine::new();
3354
3355        println!("\n=== Testing Parentheses in WHERE clause ===");
3356        println!("Table has {} rows", table.row_count());
3357        for i in 0..table.row_count() {
3358            let status = table.get_value(i, 1);
3359            let priority = table.get_value(i, 2);
3360            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3361        }
3362
3363        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3364        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3365        let result = engine.execute(
3366            table.clone(),
3367            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3368        );
3369        match result {
3370            Ok(view) => {
3371                println!(
3372                    "SUCCESS: Found {} rows with parenthetical logic",
3373                    view.row_count()
3374                );
3375                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3376            }
3377            Err(e) => {
3378                panic!("Parentheses query failed: {e}");
3379            }
3380        }
3381
3382        println!("\n=== Parentheses test complete! ===");
3383    }
3384
3385    #[test]
3386    #[ignore = "Numeric type coercion needs fixing"]
3387    fn test_numeric_type_coercion() {
3388        // Initialize tracing for debug output
3389        let _ = tracing_subscriber::fmt()
3390            .with_max_level(tracing::Level::DEBUG)
3391            .try_init();
3392
3393        let mut table = DataTable::new("test");
3394        table.add_column(DataColumn::new("id"));
3395        table.add_column(DataColumn::new("price"));
3396        table.add_column(DataColumn::new("quantity"));
3397
3398        // Add test data with different numeric types
3399        table
3400            .add_row(DataRow::new(vec![
3401                DataValue::Integer(1),
3402                DataValue::Float(99.50), // Contains '.'
3403                DataValue::Integer(100),
3404            ]))
3405            .unwrap();
3406
3407        table
3408            .add_row(DataRow::new(vec![
3409                DataValue::Integer(2),
3410                DataValue::Float(150.0), // Contains '.' and '0'
3411                DataValue::Integer(200),
3412            ]))
3413            .unwrap();
3414
3415        table
3416            .add_row(DataRow::new(vec![
3417                DataValue::Integer(3),
3418                DataValue::Integer(75), // No decimal point
3419                DataValue::Integer(50),
3420            ]))
3421            .unwrap();
3422
3423        let table = Arc::new(table);
3424        let engine = QueryEngine::new();
3425
3426        println!("\n=== Testing Numeric Type Coercion ===");
3427        println!("Table has {} rows", table.row_count());
3428        for i in 0..table.row_count() {
3429            let price = table.get_value(i, 1);
3430            let quantity = table.get_value(i, 2);
3431            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3432        }
3433
3434        // Test Contains on float values - should find rows with decimal points
3435        println!("\n--- Test: price.Contains('.') ---");
3436        let result = engine.execute(
3437            table.clone(),
3438            "SELECT * FROM test WHERE price.Contains('.')",
3439        );
3440        match result {
3441            Ok(view) => {
3442                println!(
3443                    "SUCCESS: Found {} rows with decimal points in price",
3444                    view.row_count()
3445                );
3446                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3447            }
3448            Err(e) => {
3449                panic!("Numeric Contains query failed: {e}");
3450            }
3451        }
3452
3453        // Test Contains on integer values converted to string
3454        println!("\n--- Test: quantity.Contains('0') ---");
3455        let result = engine.execute(
3456            table.clone(),
3457            "SELECT * FROM test WHERE quantity.Contains('0')",
3458        );
3459        match result {
3460            Ok(view) => {
3461                println!(
3462                    "SUCCESS: Found {} rows with '0' in quantity",
3463                    view.row_count()
3464                );
3465                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3466            }
3467            Err(e) => {
3468                panic!("Integer Contains query failed: {e}");
3469            }
3470        }
3471
3472        println!("\n=== Numeric type coercion test complete! ===");
3473    }
3474
3475    #[test]
3476    fn test_datetime_comparisons() {
3477        // Initialize tracing for debug output
3478        let _ = tracing_subscriber::fmt()
3479            .with_max_level(tracing::Level::DEBUG)
3480            .try_init();
3481
3482        let mut table = DataTable::new("test");
3483        table.add_column(DataColumn::new("id"));
3484        table.add_column(DataColumn::new("created_date"));
3485
3486        // Add test data with date strings (as they would come from CSV)
3487        table
3488            .add_row(DataRow::new(vec![
3489                DataValue::Integer(1),
3490                DataValue::String("2024-12-15".to_string()),
3491            ]))
3492            .unwrap();
3493
3494        table
3495            .add_row(DataRow::new(vec![
3496                DataValue::Integer(2),
3497                DataValue::String("2025-01-15".to_string()),
3498            ]))
3499            .unwrap();
3500
3501        table
3502            .add_row(DataRow::new(vec![
3503                DataValue::Integer(3),
3504                DataValue::String("2025-02-15".to_string()),
3505            ]))
3506            .unwrap();
3507
3508        let table = Arc::new(table);
3509        let engine = QueryEngine::new();
3510
3511        println!("\n=== Testing DateTime Comparisons ===");
3512        println!("Table has {} rows", table.row_count());
3513        for i in 0..table.row_count() {
3514            let date = table.get_value(i, 1);
3515            println!("Row {i}: created_date = {date:?}");
3516        }
3517
3518        // Test DateTime constructor comparison - should find dates after 2025-01-01
3519        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3520        let result = engine.execute(
3521            table.clone(),
3522            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3523        );
3524        match result {
3525            Ok(view) => {
3526                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3527                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3528            }
3529            Err(e) => {
3530                panic!("DateTime comparison query failed: {e}");
3531            }
3532        }
3533
3534        println!("\n=== DateTime comparison test complete! ===");
3535    }
3536
3537    #[test]
3538    fn test_not_with_method_calls() {
3539        // Initialize tracing for debug output
3540        let _ = tracing_subscriber::fmt()
3541            .with_max_level(tracing::Level::DEBUG)
3542            .try_init();
3543
3544        let mut table = DataTable::new("test");
3545        table.add_column(DataColumn::new("id"));
3546        table.add_column(DataColumn::new("status"));
3547
3548        // Add test data
3549        table
3550            .add_row(DataRow::new(vec![
3551                DataValue::Integer(1),
3552                DataValue::String("Pending Review".to_string()),
3553            ]))
3554            .unwrap();
3555
3556        table
3557            .add_row(DataRow::new(vec![
3558                DataValue::Integer(2),
3559                DataValue::String("Complete".to_string()),
3560            ]))
3561            .unwrap();
3562
3563        table
3564            .add_row(DataRow::new(vec![
3565                DataValue::Integer(3),
3566                DataValue::String("Pending Approval".to_string()),
3567            ]))
3568            .unwrap();
3569
3570        let table = Arc::new(table);
3571        let engine = QueryEngine::with_case_insensitive(true);
3572
3573        println!("\n=== Testing NOT with Method Calls ===");
3574        println!("Table has {} rows", table.row_count());
3575        for i in 0..table.row_count() {
3576            let status = table.get_value(i, 1);
3577            println!("Row {i}: status = {status:?}");
3578        }
3579
3580        // Test NOT with Contains - should exclude rows containing "pend"
3581        println!("\n--- Test: NOT status.Contains('pend') ---");
3582        let result = engine.execute(
3583            table.clone(),
3584            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3585        );
3586        match result {
3587            Ok(view) => {
3588                println!(
3589                    "SUCCESS: Found {} rows NOT containing 'pend'",
3590                    view.row_count()
3591                );
3592                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3593            }
3594            Err(e) => {
3595                panic!("NOT Contains query failed: {e}");
3596            }
3597        }
3598
3599        // Test NOT with StartsWith
3600        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3601        let result = engine.execute(
3602            table.clone(),
3603            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3604        );
3605        match result {
3606            Ok(view) => {
3607                println!(
3608                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3609                    view.row_count()
3610                );
3611                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3612            }
3613            Err(e) => {
3614                panic!("NOT StartsWith query failed: {e}");
3615            }
3616        }
3617
3618        println!("\n=== NOT with method calls test complete! ===");
3619    }
3620
3621    #[test]
3622    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3623    fn test_complex_logical_expressions() {
3624        // Initialize tracing for debug output
3625        let _ = tracing_subscriber::fmt()
3626            .with_max_level(tracing::Level::DEBUG)
3627            .try_init();
3628
3629        let mut table = DataTable::new("test");
3630        table.add_column(DataColumn::new("id"));
3631        table.add_column(DataColumn::new("status"));
3632        table.add_column(DataColumn::new("priority"));
3633        table.add_column(DataColumn::new("assigned"));
3634
3635        // Add comprehensive test data
3636        table
3637            .add_row(DataRow::new(vec![
3638                DataValue::Integer(1),
3639                DataValue::String("Pending".to_string()),
3640                DataValue::String("High".to_string()),
3641                DataValue::String("John".to_string()),
3642            ]))
3643            .unwrap();
3644
3645        table
3646            .add_row(DataRow::new(vec![
3647                DataValue::Integer(2),
3648                DataValue::String("Complete".to_string()),
3649                DataValue::String("High".to_string()),
3650                DataValue::String("Jane".to_string()),
3651            ]))
3652            .unwrap();
3653
3654        table
3655            .add_row(DataRow::new(vec![
3656                DataValue::Integer(3),
3657                DataValue::String("Pending".to_string()),
3658                DataValue::String("Low".to_string()),
3659                DataValue::String("John".to_string()),
3660            ]))
3661            .unwrap();
3662
3663        table
3664            .add_row(DataRow::new(vec![
3665                DataValue::Integer(4),
3666                DataValue::String("In Progress".to_string()),
3667                DataValue::String("Medium".to_string()),
3668                DataValue::String("Jane".to_string()),
3669            ]))
3670            .unwrap();
3671
3672        let table = Arc::new(table);
3673        let engine = QueryEngine::new();
3674
3675        println!("\n=== Testing Complex Logical Expressions ===");
3676        println!("Table has {} rows", table.row_count());
3677        for i in 0..table.row_count() {
3678            let status = table.get_value(i, 1);
3679            let priority = table.get_value(i, 2);
3680            let assigned = table.get_value(i, 3);
3681            println!(
3682                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3683            );
3684        }
3685
3686        // Test complex AND/OR logic
3687        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3688        let result = engine.execute(
3689            table.clone(),
3690            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3691        );
3692        match result {
3693            Ok(view) => {
3694                println!(
3695                    "SUCCESS: Found {} rows with complex logic",
3696                    view.row_count()
3697                );
3698                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3699            }
3700            Err(e) => {
3701                panic!("Complex logic query failed: {e}");
3702            }
3703        }
3704
3705        // Test NOT with complex expressions
3706        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3707        let result = engine.execute(
3708            table.clone(),
3709            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3710        );
3711        match result {
3712            Ok(view) => {
3713                println!(
3714                    "SUCCESS: Found {} rows with NOT complex logic",
3715                    view.row_count()
3716                );
3717                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3718            }
3719            Err(e) => {
3720                panic!("NOT complex logic query failed: {e}");
3721            }
3722        }
3723
3724        println!("\n=== Complex logical expressions test complete! ===");
3725    }
3726
3727    #[test]
3728    fn test_mixed_data_types_and_edge_cases() {
3729        // Initialize tracing for debug output
3730        let _ = tracing_subscriber::fmt()
3731            .with_max_level(tracing::Level::DEBUG)
3732            .try_init();
3733
3734        let mut table = DataTable::new("test");
3735        table.add_column(DataColumn::new("id"));
3736        table.add_column(DataColumn::new("value"));
3737        table.add_column(DataColumn::new("nullable_field"));
3738
3739        // Add test data with mixed types and edge cases
3740        table
3741            .add_row(DataRow::new(vec![
3742                DataValue::Integer(1),
3743                DataValue::String("123.45".to_string()),
3744                DataValue::String("present".to_string()),
3745            ]))
3746            .unwrap();
3747
3748        table
3749            .add_row(DataRow::new(vec![
3750                DataValue::Integer(2),
3751                DataValue::Float(678.90),
3752                DataValue::Null,
3753            ]))
3754            .unwrap();
3755
3756        table
3757            .add_row(DataRow::new(vec![
3758                DataValue::Integer(3),
3759                DataValue::Boolean(true),
3760                DataValue::String("also present".to_string()),
3761            ]))
3762            .unwrap();
3763
3764        table
3765            .add_row(DataRow::new(vec![
3766                DataValue::Integer(4),
3767                DataValue::String("false".to_string()),
3768                DataValue::Null,
3769            ]))
3770            .unwrap();
3771
3772        let table = Arc::new(table);
3773        let engine = QueryEngine::new();
3774
3775        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3776        println!("Table has {} rows", table.row_count());
3777        for i in 0..table.row_count() {
3778            let value = table.get_value(i, 1);
3779            let nullable = table.get_value(i, 2);
3780            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3781        }
3782
3783        // Test type coercion with boolean Contains
3784        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3785        let result = engine.execute(
3786            table.clone(),
3787            "SELECT * FROM test WHERE value.Contains('true')",
3788        );
3789        match result {
3790            Ok(view) => {
3791                println!(
3792                    "SUCCESS: Found {} rows with boolean coercion",
3793                    view.row_count()
3794                );
3795                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3796            }
3797            Err(e) => {
3798                panic!("Boolean coercion query failed: {e}");
3799            }
3800        }
3801
3802        // Test multiple IN values with mixed types
3803        println!("\n--- Test: id IN (1, 3) ---");
3804        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3805        match result {
3806            Ok(view) => {
3807                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3808                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
3809            }
3810            Err(e) => {
3811                panic!("Multiple IN values query failed: {e}");
3812            }
3813        }
3814
3815        println!("\n=== Mixed data types test complete! ===");
3816    }
3817
3818    /// Test that aggregate-only queries return exactly one row (regression test)
3819    #[test]
3820    fn test_aggregate_only_single_row() {
3821        let table = create_test_stock_data();
3822        let engine = QueryEngine::new();
3823
3824        // Test query with multiple aggregates - should return exactly 1 row
3825        let result = engine
3826            .execute(
3827                table.clone(),
3828                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3829            )
3830            .expect("Query should succeed");
3831
3832        assert_eq!(
3833            result.row_count(),
3834            1,
3835            "Aggregate-only query should return exactly 1 row"
3836        );
3837        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3838
3839        // Verify the actual values are correct
3840        let source = result.source();
3841        let row = source.get_row(0).expect("Should have first row");
3842
3843        // COUNT(*) should be 5 (total rows)
3844        assert_eq!(row.values[0], DataValue::Integer(5));
3845
3846        // MIN should be 99.5
3847        assert_eq!(row.values[1], DataValue::Float(99.5));
3848
3849        // MAX should be 105.0
3850        assert_eq!(row.values[2], DataValue::Float(105.0));
3851
3852        // AVG should be approximately 102.4
3853        if let DataValue::Float(avg) = &row.values[3] {
3854            assert!(
3855                (avg - 102.4).abs() < 0.01,
3856                "Average should be approximately 102.4, got {}",
3857                avg
3858            );
3859        } else {
3860            panic!("AVG should return a Float value");
3861        }
3862    }
3863
3864    /// Test single aggregate function returns single row
3865    #[test]
3866    fn test_single_aggregate_single_row() {
3867        let table = create_test_stock_data();
3868        let engine = QueryEngine::new();
3869
3870        let result = engine
3871            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3872            .expect("Query should succeed");
3873
3874        assert_eq!(
3875            result.row_count(),
3876            1,
3877            "Single aggregate query should return exactly 1 row"
3878        );
3879        assert_eq!(result.column_count(), 1, "Should have 1 column");
3880
3881        let source = result.source();
3882        let row = source.get_row(0).expect("Should have first row");
3883        assert_eq!(row.values[0], DataValue::Integer(5));
3884    }
3885
3886    /// Test aggregate with WHERE clause filtering
3887    #[test]
3888    fn test_aggregate_with_where_single_row() {
3889        let table = create_test_stock_data();
3890        let engine = QueryEngine::new();
3891
3892        // Filter to only high-value stocks (>= 103.0) and aggregate
3893        let result = engine
3894            .execute(
3895                table.clone(),
3896                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3897            )
3898            .expect("Query should succeed");
3899
3900        assert_eq!(
3901            result.row_count(),
3902            1,
3903            "Filtered aggregate query should return exactly 1 row"
3904        );
3905        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3906
3907        let source = result.source();
3908        let row = source.get_row(0).expect("Should have first row");
3909
3910        // Should find 2 rows (103.5 and 105.0)
3911        assert_eq!(row.values[0], DataValue::Integer(2));
3912        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
3913        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
3914    }
3915
3916    #[test]
3917    fn test_not_in_parsing() {
3918        use crate::sql::recursive_parser::Parser;
3919
3920        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3921        println!("\n=== Testing NOT IN parsing ===");
3922        println!("Parsing query: {query}");
3923
3924        let mut parser = Parser::new(query);
3925        match parser.parse() {
3926            Ok(statement) => {
3927                println!("Parsed statement: {statement:#?}");
3928                if let Some(where_clause) = statement.where_clause {
3929                    println!("WHERE conditions: {:#?}", where_clause.conditions);
3930                    if let Some(first_condition) = where_clause.conditions.first() {
3931                        println!("First condition expression: {:#?}", first_condition.expr);
3932                    }
3933                }
3934            }
3935            Err(e) => {
3936                panic!("Parse error: {e}");
3937            }
3938        }
3939    }
3940
3941    /// Create test stock data for aggregate testing
3942    fn create_test_stock_data() -> Arc<DataTable> {
3943        let mut table = DataTable::new("stock");
3944
3945        table.add_column(DataColumn::new("symbol"));
3946        table.add_column(DataColumn::new("close"));
3947        table.add_column(DataColumn::new("volume"));
3948
3949        // Add 5 rows of test data
3950        let test_data = vec![
3951            ("AAPL", 99.5, 1000),
3952            ("AAPL", 101.2, 1500),
3953            ("AAPL", 103.5, 2000),
3954            ("AAPL", 105.0, 1200),
3955            ("AAPL", 102.8, 1800),
3956        ];
3957
3958        for (symbol, close, volume) in test_data {
3959            table
3960                .add_row(DataRow::new(vec![
3961                    DataValue::String(symbol.to_string()),
3962                    DataValue::Float(close),
3963                    DataValue::Integer(volume),
3964                ]))
3965                .expect("Should add row successfully");
3966        }
3967
3968        Arc::new(table)
3969    }
3970}
3971
3972#[cfg(test)]
3973#[path = "query_engine_tests.rs"]
3974mod query_engine_tests;