Skip to main content

sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Execution context for tracking table aliases and scope during query execution
33#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35    /// Map from alias to actual table/CTE name
36    /// Example: "t" -> "#tmp_trades", "a" -> "data"
37    alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41    /// Create a new empty execution context
42    pub fn new() -> Self {
43        Self {
44            alias_map: HashMap::new(),
45        }
46    }
47
48    /// Register a table alias
49    pub fn register_alias(&mut self, alias: String, table_name: String) {
50        debug!("Registering alias: {} -> {}", alias, table_name);
51        self.alias_map.insert(alias, table_name);
52    }
53
54    /// Resolve an alias to its actual table name
55    /// Returns the alias itself if not found in the map
56    pub fn resolve_alias(&self, name: &str) -> String {
57        self.alias_map
58            .get(name)
59            .cloned()
60            .unwrap_or_else(|| name.to_string())
61    }
62
63    /// Check if a name is a registered alias
64    pub fn is_alias(&self, name: &str) -> bool {
65        self.alias_map.contains_key(name)
66    }
67
68    /// Get a copy of all registered aliases
69    pub fn get_aliases(&self) -> HashMap<String, String> {
70        self.alias_map.clone()
71    }
72
73    /// Resolve a column reference to its index in the table, handling aliases
74    ///
75    /// This is the unified column resolution function that should be used by all
76    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
77    /// alias resolution behavior.
78    ///
79    /// Resolution strategy:
80    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
81    ///    a. Resolve the alias: t -> actual_table_name
82    ///    b. Try qualified lookup: "actual_table_name.amount"
83    ///    c. Fall back to unqualified: "amount"
84    /// 2. If column_ref has no prefix:
85    ///    a. Try simple column name lookup: "amount"
86    ///    b. Try as qualified name if it contains a dot: "table.column"
87    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88        if let Some(table_prefix) = &column_ref.table_prefix {
89            // Qualified column reference: resolve the alias first
90            let actual_table = self.resolve_alias(table_prefix);
91
92            // Try qualified lookup: "actual_table.column"
93            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95                debug!(
96                    "Resolved {}.{} -> qualified column '{}' at index {}",
97                    table_prefix, column_ref.name, qualified_name, idx
98                );
99                return Ok(idx);
100            }
101
102            // Fall back to unqualified lookup
103            if let Some(idx) = table.get_column_index(&column_ref.name) {
104                debug!(
105                    "Resolved {}.{} -> unqualified column '{}' at index {}",
106                    table_prefix, column_ref.name, column_ref.name, idx
107                );
108                return Ok(idx);
109            }
110
111            // Not found with either qualified or unqualified name
112            Err(anyhow!(
113                "Column '{}' not found. Table '{}' may not support qualified column names",
114                qualified_name,
115                actual_table
116            ))
117        } else {
118            // Unqualified column reference
119            if let Some(idx) = table.get_column_index(&column_ref.name) {
120                debug!(
121                    "Resolved unqualified column '{}' at index {}",
122                    column_ref.name, idx
123                );
124                return Ok(idx);
125            }
126
127            // If the column name contains a dot, try it as a qualified name
128            if column_ref.name.contains('.') {
129                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130                    debug!(
131                        "Resolved '{}' as qualified column at index {}",
132                        column_ref.name, idx
133                    );
134                    return Ok(idx);
135                }
136            }
137
138            // Column not found - provide helpful error
139            let suggestion = self.find_similar_column(table, &column_ref.name);
140            match suggestion {
141                Some(similar) => Err(anyhow!(
142                    "Column '{}' not found. Did you mean '{}'?",
143                    column_ref.name,
144                    similar
145                )),
146                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147            }
148        }
149    }
150
151    /// Find a similar column name using edit distance (for better error messages)
152    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153        let columns = table.column_names();
154        let mut best_match: Option<(String, usize)> = None;
155
156        for col in columns {
157            let distance = edit_distance(name, &col);
158            if distance <= 2 {
159                // Allow up to 2 character differences
160                match best_match {
161                    Some((_, best_dist)) if distance < best_dist => {
162                        best_match = Some((col.clone(), distance));
163                    }
164                    None => {
165                        best_match = Some((col.clone(), distance));
166                    }
167                    _ => {}
168                }
169            }
170        }
171
172        best_match.map(|(name, _)| name)
173    }
174}
175
176impl Default for ExecutionContext {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Calculate edit distance between two strings (Levenshtein distance)
183fn edit_distance(a: &str, b: &str) -> usize {
184    let len_a = a.chars().count();
185    let len_b = b.chars().count();
186
187    if len_a == 0 {
188        return len_b;
189    }
190    if len_b == 0 {
191        return len_a;
192    }
193
194    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196    for i in 0..=len_a {
197        matrix[i][0] = i;
198    }
199    for j in 0..=len_b {
200        matrix[0][j] = j;
201    }
202
203    let a_chars: Vec<char> = a.chars().collect();
204    let b_chars: Vec<char> = b.chars().collect();
205
206    for i in 1..=len_a {
207        for j in 1..=len_b {
208            let cost = if a_chars[i - 1] == b_chars[j - 1] {
209                0
210            } else {
211                1
212            };
213            matrix[i][j] = min(
214                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215                matrix[i - 1][j - 1] + cost,
216            );
217        }
218    }
219
220    matrix[len_a][len_b]
221}
222
223/// Query engine that executes SQL directly on `DataTable`
224#[derive(Clone)]
225pub struct QueryEngine {
226    case_insensitive: bool,
227    date_notation: String,
228    _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl QueryEngine {
238    #[must_use]
239    pub fn new() -> Self {
240        Self {
241            case_insensitive: false,
242            date_notation: get_date_notation(),
243            _behavior_config: None,
244        }
245    }
246
247    #[must_use]
248    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249        let case_insensitive = config.case_insensitive_default;
250        // Use get_date_notation() to respect environment variable override
251        let date_notation = get_date_notation();
252        Self {
253            case_insensitive,
254            date_notation,
255            _behavior_config: Some(config),
256        }
257    }
258
259    #[must_use]
260    pub fn with_date_notation(_date_notation: String) -> Self {
261        Self {
262            case_insensitive: false,
263            date_notation: get_date_notation(), // Always use the global function
264            _behavior_config: None,
265        }
266    }
267
268    #[must_use]
269    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270        Self {
271            case_insensitive,
272            date_notation: get_date_notation(),
273            _behavior_config: None,
274        }
275    }
276
277    #[must_use]
278    pub fn with_case_insensitive_and_date_notation(
279        case_insensitive: bool,
280        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
281    ) -> Self {
282        Self {
283            case_insensitive,
284            date_notation: get_date_notation(), // Always use the global function
285            _behavior_config: None,
286        }
287    }
288
289    /// Find a column name similar to the given name using edit distance
290    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291        let columns = table.column_names();
292        let mut best_match: Option<(String, usize)> = None;
293
294        for col in columns {
295            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296            // Only suggest if distance is small (likely a typo)
297            // Allow up to 3 edits for longer names
298            let max_distance = if name.len() > 10 { 3 } else { 2 };
299            if distance <= max_distance {
300                match &best_match {
301                    None => best_match = Some((col, distance)),
302                    Some((_, best_dist)) if distance < *best_dist => {
303                        best_match = Some((col, distance));
304                    }
305                    _ => {}
306                }
307            }
308        }
309
310        best_match.map(|(name, _)| name)
311    }
312
313    /// Calculate Levenshtein edit distance between two strings
314    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315        let len1 = s1.len();
316        let len2 = s2.len();
317        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319        for i in 0..=len1 {
320            matrix[i][0] = i;
321        }
322        for j in 0..=len2 {
323            matrix[0][j] = j;
324        }
325
326        for (i, c1) in s1.chars().enumerate() {
327            for (j, c2) in s2.chars().enumerate() {
328                let cost = usize::from(c1 != c2);
329                matrix[i + 1][j + 1] = std::cmp::min(
330                    matrix[i][j + 1] + 1, // deletion
331                    std::cmp::min(
332                        matrix[i + 1][j] + 1, // insertion
333                        matrix[i][j] + cost,  // substitution
334                    ),
335                );
336            }
337        }
338
339        matrix[len1][len2]
340    }
341
342    /// Check if an expression contains UNNEST function call
343    fn contains_unnest(expr: &SqlExpression) -> bool {
344        match expr {
345            // Direct UNNEST variant
346            SqlExpression::Unnest { .. } => true,
347            SqlExpression::FunctionCall { name, args, .. } => {
348                if name.to_uppercase() == "UNNEST" {
349                    return true;
350                }
351                // Check recursively in function arguments
352                args.iter().any(Self::contains_unnest)
353            }
354            SqlExpression::BinaryOp { left, right, .. } => {
355                Self::contains_unnest(left) || Self::contains_unnest(right)
356            }
357            SqlExpression::Not { expr } => Self::contains_unnest(expr),
358            SqlExpression::CaseExpression {
359                when_branches,
360                else_branch,
361            } => {
362                when_branches.iter().any(|branch| {
363                    Self::contains_unnest(&branch.condition)
364                        || Self::contains_unnest(&branch.result)
365                }) || else_branch
366                    .as_ref()
367                    .map_or(false, |e| Self::contains_unnest(e))
368            }
369            SqlExpression::SimpleCaseExpression {
370                expr,
371                when_branches,
372                else_branch,
373            } => {
374                Self::contains_unnest(expr)
375                    || when_branches.iter().any(|branch| {
376                        Self::contains_unnest(&branch.value)
377                            || Self::contains_unnest(&branch.result)
378                    })
379                    || else_branch
380                        .as_ref()
381                        .map_or(false, |e| Self::contains_unnest(e))
382            }
383            SqlExpression::InList { expr, values } => {
384                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385            }
386            SqlExpression::NotInList { expr, values } => {
387                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388            }
389            SqlExpression::Between { expr, lower, upper } => {
390                Self::contains_unnest(expr)
391                    || Self::contains_unnest(lower)
392                    || Self::contains_unnest(upper)
393            }
394            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
397            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399            SqlExpression::ChainedMethodCall { base, args, .. } => {
400                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401            }
402            _ => false,
403        }
404    }
405
406    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
407    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408        match expr {
409            SqlExpression::WindowFunction {
410                window_spec, args, ..
411            } => {
412                // Add this window spec
413                specs.push(window_spec.clone());
414                // Recursively check arguments
415                for arg in args {
416                    Self::collect_window_specs(arg, specs);
417                }
418            }
419            SqlExpression::BinaryOp { left, right, .. } => {
420                Self::collect_window_specs(left, specs);
421                Self::collect_window_specs(right, specs);
422            }
423            SqlExpression::Not { expr } => {
424                Self::collect_window_specs(expr, specs);
425            }
426            SqlExpression::FunctionCall { args, .. } => {
427                for arg in args {
428                    Self::collect_window_specs(arg, specs);
429                }
430            }
431            SqlExpression::CaseExpression {
432                when_branches,
433                else_branch,
434            } => {
435                for branch in when_branches {
436                    Self::collect_window_specs(&branch.condition, specs);
437                    Self::collect_window_specs(&branch.result, specs);
438                }
439                if let Some(else_expr) = else_branch {
440                    Self::collect_window_specs(else_expr, specs);
441                }
442            }
443            SqlExpression::SimpleCaseExpression {
444                expr,
445                when_branches,
446                else_branch,
447            } => {
448                Self::collect_window_specs(expr, specs);
449                for branch in when_branches {
450                    Self::collect_window_specs(&branch.value, specs);
451                    Self::collect_window_specs(&branch.result, specs);
452                }
453                if let Some(else_expr) = else_branch {
454                    Self::collect_window_specs(else_expr, specs);
455                }
456            }
457            SqlExpression::InList { expr, values, .. } => {
458                Self::collect_window_specs(expr, specs);
459                for item in values {
460                    Self::collect_window_specs(item, specs);
461                }
462            }
463            SqlExpression::ChainedMethodCall { base, args, .. } => {
464                Self::collect_window_specs(base, specs);
465                for arg in args {
466                    Self::collect_window_specs(arg, specs);
467                }
468            }
469            // Leaf nodes - no recursion needed
470            SqlExpression::Column(_)
471            | SqlExpression::NumberLiteral(_)
472            | SqlExpression::StringLiteral(_)
473            | SqlExpression::BooleanLiteral(_)
474            | SqlExpression::Null
475            | SqlExpression::DateTimeToday { .. }
476            | SqlExpression::DateTimeConstructor { .. }
477            | SqlExpression::MethodCall { .. } => {}
478            // Catch-all for any other variants
479            _ => {}
480        }
481    }
482
483    /// Check if an expression contains a window function
484    fn contains_window_function(expr: &SqlExpression) -> bool {
485        match expr {
486            SqlExpression::WindowFunction { .. } => true,
487            SqlExpression::BinaryOp { left, right, .. } => {
488                Self::contains_window_function(left) || Self::contains_window_function(right)
489            }
490            SqlExpression::Not { expr } => Self::contains_window_function(expr),
491            SqlExpression::FunctionCall { args, .. } => {
492                args.iter().any(Self::contains_window_function)
493            }
494            SqlExpression::CaseExpression {
495                when_branches,
496                else_branch,
497            } => {
498                when_branches.iter().any(|branch| {
499                    Self::contains_window_function(&branch.condition)
500                        || Self::contains_window_function(&branch.result)
501                }) || else_branch
502                    .as_ref()
503                    .map_or(false, |e| Self::contains_window_function(e))
504            }
505            SqlExpression::SimpleCaseExpression {
506                expr,
507                when_branches,
508                else_branch,
509            } => {
510                Self::contains_window_function(expr)
511                    || when_branches.iter().any(|branch| {
512                        Self::contains_window_function(&branch.value)
513                            || Self::contains_window_function(&branch.result)
514                    })
515                    || else_branch
516                        .as_ref()
517                        .map_or(false, |e| Self::contains_window_function(e))
518            }
519            SqlExpression::InList { expr, values } => {
520                Self::contains_window_function(expr)
521                    || values.iter().any(Self::contains_window_function)
522            }
523            SqlExpression::NotInList { expr, values } => {
524                Self::contains_window_function(expr)
525                    || values.iter().any(Self::contains_window_function)
526            }
527            SqlExpression::Between { expr, lower, upper } => {
528                Self::contains_window_function(expr)
529                    || Self::contains_window_function(lower)
530                    || Self::contains_window_function(upper)
531            }
532            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534            SqlExpression::MethodCall { args, .. } => {
535                args.iter().any(Self::contains_window_function)
536            }
537            SqlExpression::ChainedMethodCall { base, args, .. } => {
538                Self::contains_window_function(base)
539                    || args.iter().any(Self::contains_window_function)
540            }
541            _ => false,
542        }
543    }
544
545    /// Extract all window function specifications from select items
546    fn extract_window_specs(
547        items: &[SelectItem],
548    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549        let mut specs = Vec::new();
550        for (idx, item) in items.iter().enumerate() {
551            if let SelectItem::Expression { expr, .. } = item {
552                Self::collect_window_function_specs(expr, idx, &mut specs);
553            }
554        }
555        specs
556    }
557
558    /// Recursively collect window function specs from an expression
559    fn collect_window_function_specs(
560        expr: &SqlExpression,
561        output_column_index: usize,
562        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563    ) {
564        match expr {
565            SqlExpression::WindowFunction {
566                name,
567                args,
568                window_spec,
569            } => {
570                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571                    spec: window_spec.clone(),
572                    function_name: name.clone(),
573                    args: args.clone(),
574                    output_column_index,
575                });
576            }
577            SqlExpression::BinaryOp { left, right, .. } => {
578                Self::collect_window_function_specs(left, output_column_index, specs);
579                Self::collect_window_function_specs(right, output_column_index, specs);
580            }
581            SqlExpression::Not { expr } => {
582                Self::collect_window_function_specs(expr, output_column_index, specs);
583            }
584            SqlExpression::FunctionCall { args, .. } => {
585                for arg in args {
586                    Self::collect_window_function_specs(arg, output_column_index, specs);
587                }
588            }
589            SqlExpression::CaseExpression {
590                when_branches,
591                else_branch,
592            } => {
593                for branch in when_branches {
594                    Self::collect_window_function_specs(
595                        &branch.condition,
596                        output_column_index,
597                        specs,
598                    );
599                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600                }
601                if let Some(e) = else_branch {
602                    Self::collect_window_function_specs(e, output_column_index, specs);
603                }
604            }
605            SqlExpression::SimpleCaseExpression {
606                expr,
607                when_branches,
608                else_branch,
609            } => {
610                Self::collect_window_function_specs(expr, output_column_index, specs);
611                for branch in when_branches {
612                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614                }
615                if let Some(e) = else_branch {
616                    Self::collect_window_function_specs(e, output_column_index, specs);
617                }
618            }
619            SqlExpression::InList { expr, values } => {
620                Self::collect_window_function_specs(expr, output_column_index, specs);
621                for val in values {
622                    Self::collect_window_function_specs(val, output_column_index, specs);
623                }
624            }
625            SqlExpression::NotInList { expr, values } => {
626                Self::collect_window_function_specs(expr, output_column_index, specs);
627                for val in values {
628                    Self::collect_window_function_specs(val, output_column_index, specs);
629                }
630            }
631            SqlExpression::Between { expr, lower, upper } => {
632                Self::collect_window_function_specs(expr, output_column_index, specs);
633                Self::collect_window_function_specs(lower, output_column_index, specs);
634                Self::collect_window_function_specs(upper, output_column_index, specs);
635            }
636            SqlExpression::InSubquery { expr, .. } => {
637                Self::collect_window_function_specs(expr, output_column_index, specs);
638            }
639            SqlExpression::NotInSubquery { expr, .. } => {
640                Self::collect_window_function_specs(expr, output_column_index, specs);
641            }
642            SqlExpression::MethodCall { args, .. } => {
643                for arg in args {
644                    Self::collect_window_function_specs(arg, output_column_index, specs);
645                }
646            }
647            SqlExpression::ChainedMethodCall { base, args, .. } => {
648                Self::collect_window_function_specs(base, output_column_index, specs);
649                for arg in args {
650                    Self::collect_window_function_specs(arg, output_column_index, specs);
651                }
652            }
653            _ => {} // Other expression types don't contain window functions
654        }
655    }
656
657    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
658    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659        let (view, _plan) = self.execute_with_plan(table, sql)?;
660        Ok(view)
661    }
662
663    /// Execute a SQL query with optional temp table registry access
664    pub fn execute_with_temp_tables(
665        &self,
666        table: Arc<DataTable>,
667        sql: &str,
668        temp_tables: Option<&TempTableRegistry>,
669    ) -> Result<DataView> {
670        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671        Ok(view)
672    }
673
674    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
675    pub fn execute_statement(
676        &self,
677        table: Arc<DataTable>,
678        statement: SelectStatement,
679    ) -> Result<DataView> {
680        self.execute_statement_with_temp_tables(table, statement, None)
681    }
682
683    /// Execute a parsed SelectStatement with optional temp table access
684    pub fn execute_statement_with_temp_tables(
685        &self,
686        table: Arc<DataTable>,
687        statement: SelectStatement,
688        temp_tables: Option<&TempTableRegistry>,
689    ) -> Result<DataView> {
690        // First process CTEs to build context
691        let mut cte_context = HashMap::new();
692
693        // Add temp tables to CTE context if provided
694        if let Some(temp_registry) = temp_tables {
695            for table_name in temp_registry.list_tables() {
696                if let Some(temp_table) = temp_registry.get(&table_name) {
697                    debug!("Adding temp table {} to CTE context", table_name);
698                    let view = DataView::new(temp_table);
699                    cte_context.insert(table_name, Arc::new(view));
700                }
701            }
702        }
703
704        for cte in &statement.ctes {
705            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706            // Execute the CTE based on its type
707            let cte_result = match &cte.cte_type {
708                CTEType::Standard(query) => {
709                    // Execute the CTE query (it might reference earlier CTEs)
710                    let view = self.build_view_with_context(
711                        table.clone(),
712                        query.clone(),
713                        &mut cte_context,
714                    )?;
715
716                    // Materialize the view and enrich columns with qualified names
717                    let mut materialized = self.materialize_view(view)?;
718
719                    // Enrich columns with qualified names for proper scoping
720                    for column in materialized.columns_mut() {
721                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722                        column.source_table = Some(cte.name.clone());
723                    }
724
725                    DataView::new(Arc::new(materialized))
726                }
727                CTEType::Web(web_spec) => {
728                    // Fetch data from URL
729                    use crate::web::http_fetcher::WebDataFetcher;
730
731                    let fetcher = WebDataFetcher::new()?;
732                    // Pass None for query context (no full SQL available in these contexts)
733                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735                    // Enrich columns with qualified names for proper scoping
736                    for column in data_table.columns_mut() {
737                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738                        column.source_table = Some(cte.name.clone());
739                    }
740
741                    // Convert DataTable to DataView
742                    DataView::new(Arc::new(data_table))
743                }
744                CTEType::File(file_spec) => {
745                    let mut data_table =
746                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
747
748                    for column in data_table.columns_mut() {
749                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
750                        column.source_table = Some(cte.name.clone());
751                    }
752
753                    DataView::new(Arc::new(data_table))
754                }
755            };
756            // Store the result in the context for later use
757            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
758            debug!(
759                "QueryEngine: CTE '{}' pre-processed, stored in context",
760                cte.name
761            );
762        }
763
764        // Now process subqueries with CTE context available
765        let mut subquery_executor =
766            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
767        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
768
769        // Build the view with the same CTE context
770        self.build_view_with_context(table, processed_statement, &mut cte_context)
771    }
772
773    /// Execute a statement with provided CTE context (for subqueries)
774    pub fn execute_statement_with_cte_context(
775        &self,
776        table: Arc<DataTable>,
777        statement: SelectStatement,
778        cte_context: &HashMap<String, Arc<DataView>>,
779    ) -> Result<DataView> {
780        // Clone the context so we can add any CTEs from this statement
781        let mut local_context = cte_context.clone();
782
783        // Process any CTEs in this statement (they might be nested)
784        for cte in &statement.ctes {
785            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
786            let cte_result = match &cte.cte_type {
787                CTEType::Standard(query) => {
788                    let view = self.build_view_with_context(
789                        table.clone(),
790                        query.clone(),
791                        &mut local_context,
792                    )?;
793
794                    // Materialize the view and enrich columns with qualified names
795                    let mut materialized = self.materialize_view(view)?;
796
797                    // Enrich columns with qualified names for proper scoping
798                    for column in materialized.columns_mut() {
799                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
800                        column.source_table = Some(cte.name.clone());
801                    }
802
803                    DataView::new(Arc::new(materialized))
804                }
805                CTEType::Web(web_spec) => {
806                    // Fetch data from URL
807                    use crate::web::http_fetcher::WebDataFetcher;
808
809                    let fetcher = WebDataFetcher::new()?;
810                    // Pass None for query context (no full SQL available in these contexts)
811                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
812
813                    // Enrich columns with qualified names for proper scoping
814                    for column in data_table.columns_mut() {
815                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
816                        column.source_table = Some(cte.name.clone());
817                    }
818
819                    // Convert DataTable to DataView
820                    DataView::new(Arc::new(data_table))
821                }
822                CTEType::File(file_spec) => {
823                    let mut data_table =
824                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
825
826                    for column in data_table.columns_mut() {
827                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
828                        column.source_table = Some(cte.name.clone());
829                    }
830
831                    DataView::new(Arc::new(data_table))
832                }
833            };
834            local_context.insert(cte.name.clone(), Arc::new(cte_result));
835        }
836
837        // Process subqueries with the complete context
838        let mut subquery_executor =
839            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
840        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
841
842        // Build the view
843        self.build_view_with_context(table, processed_statement, &mut local_context)
844    }
845
846    /// Execute a query and return both the result and the execution plan
847    pub fn execute_with_plan(
848        &self,
849        table: Arc<DataTable>,
850        sql: &str,
851    ) -> Result<(DataView, ExecutionPlan)> {
852        self.execute_with_plan_and_temp_tables(table, sql, None)
853    }
854
855    /// Execute a query with temp tables and return both the result and the execution plan
856    pub fn execute_with_plan_and_temp_tables(
857        &self,
858        table: Arc<DataTable>,
859        sql: &str,
860        temp_tables: Option<&TempTableRegistry>,
861    ) -> Result<(DataView, ExecutionPlan)> {
862        let mut plan_builder = ExecutionPlanBuilder::new();
863        let start_time = Instant::now();
864
865        // Parse the SQL query
866        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
867        plan_builder.add_detail(format!("Query: {}", sql));
868        let mut parser = Parser::new(sql);
869        let statement = parser
870            .parse()
871            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
872        plan_builder.add_detail(format!("Parsed successfully"));
873        if let Some(ref from_source) = statement.from_source {
874            match from_source {
875                TableSource::Table(name) => {
876                    plan_builder.add_detail(format!("FROM: {}", name));
877                }
878                TableSource::DerivedTable { alias, .. } => {
879                    plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
880                }
881                TableSource::Pivot { .. } => {
882                    plan_builder.add_detail("FROM: PIVOT".to_string());
883                }
884            }
885        }
886        if statement.where_clause.is_some() {
887            plan_builder.add_detail("WHERE clause present".to_string());
888        }
889        plan_builder.end_step();
890
891        // First process CTEs to build context
892        let mut cte_context = HashMap::new();
893
894        // Add temp tables to CTE context if provided
895        if let Some(temp_registry) = temp_tables {
896            for table_name in temp_registry.list_tables() {
897                if let Some(temp_table) = temp_registry.get(&table_name) {
898                    debug!("Adding temp table {} to CTE context", table_name);
899                    let view = DataView::new(temp_table);
900                    cte_context.insert(table_name, Arc::new(view));
901                }
902            }
903        }
904
905        if !statement.ctes.is_empty() {
906            plan_builder.begin_step(
907                StepType::CTE,
908                format!("Process {} CTEs", statement.ctes.len()),
909            );
910
911            for cte in &statement.ctes {
912                let cte_start = Instant::now();
913                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
914
915                let cte_result = match &cte.cte_type {
916                    CTEType::Standard(query) => {
917                        // Add CTE query details
918                        if let Some(ref from_source) = query.from_source {
919                            match from_source {
920                                TableSource::Table(name) => {
921                                    plan_builder.add_detail(format!("Source: {}", name));
922                                }
923                                TableSource::DerivedTable { alias, .. } => {
924                                    plan_builder
925                                        .add_detail(format!("Source: derived table ({})", alias));
926                                }
927                                TableSource::Pivot { .. } => {
928                                    plan_builder.add_detail("Source: PIVOT".to_string());
929                                }
930                            }
931                        }
932                        if query.where_clause.is_some() {
933                            plan_builder.add_detail("Has WHERE clause".to_string());
934                        }
935                        if query.group_by.is_some() {
936                            plan_builder.add_detail("Has GROUP BY".to_string());
937                        }
938
939                        debug!(
940                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
941                            cte.name,
942                            cte_context.keys().collect::<Vec<_>>()
943                        );
944
945                        // Process subqueries in the CTE's query FIRST
946                        // This allows the subqueries to see all previously defined CTEs
947                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
948                            self.clone(),
949                            table.clone(),
950                            cte_context.clone(),
951                        );
952                        let processed_query = subquery_executor.execute_subqueries(query)?;
953
954                        let view = self.build_view_with_context(
955                            table.clone(),
956                            processed_query,
957                            &mut cte_context,
958                        )?;
959
960                        // Materialize the view and enrich columns with qualified names
961                        let mut materialized = self.materialize_view(view)?;
962
963                        // Enrich columns with qualified names for proper scoping
964                        for column in materialized.columns_mut() {
965                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
966                            column.source_table = Some(cte.name.clone());
967                        }
968
969                        DataView::new(Arc::new(materialized))
970                    }
971                    CTEType::Web(web_spec) => {
972                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
973                        if let Some(format) = &web_spec.format {
974                            plan_builder.add_detail(format!("Format: {:?}", format));
975                        }
976                        if let Some(cache) = web_spec.cache_seconds {
977                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
978                        }
979
980                        // Fetch data from URL
981                        use crate::web::http_fetcher::WebDataFetcher;
982
983                        let fetcher = WebDataFetcher::new()?;
984                        // Pass None for query context - each WEB CTE is independent
985                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
986
987                        // Enrich columns with qualified names for proper scoping
988                        for column in data_table.columns_mut() {
989                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
990                            column.source_table = Some(cte.name.clone());
991                        }
992
993                        // Convert DataTable to DataView
994                        DataView::new(Arc::new(data_table))
995                    }
996                    CTEType::File(file_spec) => {
997                        plan_builder.add_detail(format!("PATH: {}", file_spec.path));
998                        if file_spec.recursive {
999                            plan_builder.add_detail("RECURSIVE".to_string());
1000                        }
1001                        if let Some(ref g) = file_spec.glob {
1002                            plan_builder.add_detail(format!("GLOB: {}", g));
1003                        }
1004                        if let Some(d) = file_spec.max_depth {
1005                            plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1006                        }
1007
1008                        let mut data_table =
1009                            crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1010
1011                        for column in data_table.columns_mut() {
1012                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1013                            column.source_table = Some(cte.name.clone());
1014                        }
1015
1016                        DataView::new(Arc::new(data_table))
1017                    }
1018                };
1019
1020                // Record CTE statistics
1021                plan_builder.set_rows_out(cte_result.row_count());
1022                plan_builder.add_detail(format!(
1023                    "Result: {} rows, {} columns",
1024                    cte_result.row_count(),
1025                    cte_result.column_count()
1026                ));
1027                plan_builder.add_detail(format!(
1028                    "Execution time: {:.3}ms",
1029                    cte_start.elapsed().as_secs_f64() * 1000.0
1030                ));
1031
1032                debug!(
1033                    "QueryEngine: Storing CTE '{}' in context with {} rows",
1034                    cte.name,
1035                    cte_result.row_count()
1036                );
1037                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1038                plan_builder.end_step();
1039            }
1040
1041            plan_builder.add_detail(format!(
1042                "All {} CTEs cached in context",
1043                statement.ctes.len()
1044            ));
1045            plan_builder.end_step();
1046        }
1047
1048        // Process subqueries in the statement with CTE context
1049        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1050        let mut subquery_executor =
1051            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1052
1053        // Check if there are subqueries to process
1054        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1055            // This is a simplified check - in reality we'd need to walk the AST
1056            format!("{:?}", w).contains("Subquery")
1057        });
1058
1059        if has_subqueries {
1060            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1061        }
1062
1063        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1064
1065        if has_subqueries {
1066            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1067        } else {
1068            plan_builder.add_detail("No subqueries to process".to_string());
1069        }
1070
1071        plan_builder.end_step();
1072        let result = self.build_view_with_context_and_plan(
1073            table,
1074            processed_statement,
1075            &mut cte_context,
1076            &mut plan_builder,
1077        )?;
1078
1079        let total_duration = start_time.elapsed();
1080        info!(
1081            "Query execution complete: total={:?}, rows={}",
1082            total_duration,
1083            result.row_count()
1084        );
1085
1086        let plan = plan_builder.build();
1087        Ok((result, plan))
1088    }
1089
1090    /// Build a `DataView` from a parsed SQL statement
1091    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1092        let mut cte_context = HashMap::new();
1093        self.build_view_with_context(table, statement, &mut cte_context)
1094    }
1095
1096    /// Build a DataView from a SelectStatement with CTE context
1097    fn build_view_with_context(
1098        &self,
1099        table: Arc<DataTable>,
1100        statement: SelectStatement,
1101        cte_context: &mut HashMap<String, Arc<DataView>>,
1102    ) -> Result<DataView> {
1103        let mut dummy_plan = ExecutionPlanBuilder::new();
1104        let mut exec_context = ExecutionContext::new();
1105        self.build_view_with_context_and_plan_and_exec(
1106            table,
1107            statement,
1108            cte_context,
1109            &mut dummy_plan,
1110            &mut exec_context,
1111        )
1112    }
1113
1114    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1115    fn build_view_with_context_and_plan(
1116        &self,
1117        table: Arc<DataTable>,
1118        statement: SelectStatement,
1119        cte_context: &mut HashMap<String, Arc<DataView>>,
1120        plan: &mut ExecutionPlanBuilder,
1121    ) -> Result<DataView> {
1122        let mut exec_context = ExecutionContext::new();
1123        self.build_view_with_context_and_plan_and_exec(
1124            table,
1125            statement,
1126            cte_context,
1127            plan,
1128            &mut exec_context,
1129        )
1130    }
1131
1132    /// Build a DataView with CTE context, execution plan, and alias resolution context
1133    fn build_view_with_context_and_plan_and_exec(
1134        &self,
1135        table: Arc<DataTable>,
1136        statement: SelectStatement,
1137        cte_context: &mut HashMap<String, Arc<DataView>>,
1138        plan: &mut ExecutionPlanBuilder,
1139        exec_context: &mut ExecutionContext,
1140    ) -> Result<DataView> {
1141        // First, process any CTEs that aren't already in the context
1142        for cte in &statement.ctes {
1143            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1144            if cte_context.contains_key(&cte.name) {
1145                debug!(
1146                    "QueryEngine: CTE '{}' already in context, skipping",
1147                    cte.name
1148                );
1149                continue;
1150            }
1151
1152            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1153            debug!(
1154                "QueryEngine: Available CTEs for '{}': {:?}",
1155                cte.name,
1156                cte_context.keys().collect::<Vec<_>>()
1157            );
1158
1159            // Execute the CTE query (it might reference earlier CTEs)
1160            let cte_result = match &cte.cte_type {
1161                CTEType::Standard(query) => {
1162                    let view =
1163                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1164
1165                    // Materialize the view and enrich columns with qualified names
1166                    let mut materialized = self.materialize_view(view)?;
1167
1168                    // Enrich columns with qualified names for proper scoping
1169                    for column in materialized.columns_mut() {
1170                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1171                        column.source_table = Some(cte.name.clone());
1172                    }
1173
1174                    DataView::new(Arc::new(materialized))
1175                }
1176                CTEType::Web(_web_spec) => {
1177                    // Web CTEs should have been processed earlier in execute_select
1178                    return Err(anyhow!(
1179                        "Web CTEs should be processed in execute_select method"
1180                    ));
1181                }
1182                CTEType::File(_file_spec) => {
1183                    // FILE CTEs (like WEB) should be processed earlier in execute_select
1184                    return Err(anyhow!(
1185                        "FILE CTEs should be processed in execute_select method"
1186                    ));
1187                }
1188            };
1189
1190            // Store the result in the context for later use
1191            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1192            debug!(
1193                "QueryEngine: CTE '{}' processed, stored in context",
1194                cte.name
1195            );
1196        }
1197
1198        // Determine the source table for the main query
1199        let source_table = if let Some(ref from_source) = statement.from_source {
1200            match from_source {
1201                TableSource::Table(table_name) => {
1202                    // Check if this references a CTE
1203                    if let Some(cte_view) = cte_context.get(table_name) {
1204                        debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1205                        // Materialize the CTE view as a table
1206                        let mut materialized = self.materialize_view((**cte_view).clone())?;
1207
1208                        // Apply alias to qualified column names if present
1209                        #[allow(deprecated)]
1210                        if let Some(ref alias) = statement.from_alias {
1211                            debug!(
1212                                "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1213                                alias, table_name
1214                            );
1215                            for column in materialized.columns_mut() {
1216                                // Replace the CTE name with the alias in qualified names
1217                                if let Some(ref qualified_name) = column.qualified_name {
1218                                    if qualified_name.starts_with(&format!("{}.", table_name)) {
1219                                        column.qualified_name = Some(qualified_name.replace(
1220                                            &format!("{}.", table_name),
1221                                            &format!("{}.", alias),
1222                                        ));
1223                                    }
1224                                }
1225                                // Update source table to reflect the alias
1226                                if column.source_table.as_ref() == Some(table_name) {
1227                                    column.source_table = Some(alias.clone());
1228                                }
1229                            }
1230                        }
1231
1232                        Arc::new(materialized)
1233                    } else {
1234                        // Regular table reference - use the provided table
1235                        table.clone()
1236                    }
1237                }
1238                TableSource::DerivedTable { query, alias } => {
1239                    // Execute the subquery and use its result as the source
1240                    debug!(
1241                        "QueryEngine: Processing FROM derived table (alias: {})",
1242                        alias
1243                    );
1244                    let subquery_result =
1245                        self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1246
1247                    // Convert the DataView to a DataTable for use as source
1248                    // This materializes the subquery result
1249                    let mut materialized = self.materialize_view(subquery_result)?;
1250
1251                    // Apply the alias to all columns in the derived table
1252                    // Note: We set source_table but keep the column names unqualified
1253                    // so they can be referenced without the table prefix
1254                    for column in materialized.columns_mut() {
1255                        column.source_table = Some(alias.clone());
1256                    }
1257
1258                    Arc::new(materialized)
1259                }
1260                TableSource::Pivot { .. } => {
1261                    // PIVOT should have been expanded by PivotExpander transformer
1262                    return Err(anyhow!(
1263                        "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1264                    ));
1265                }
1266            }
1267        } else {
1268            // Fallback to deprecated fields for backward compatibility
1269            #[allow(deprecated)]
1270            if let Some(ref table_func) = statement.from_function {
1271                // Handle table functions like RANGE()
1272                debug!("QueryEngine: Processing table function (deprecated field)...");
1273                match table_func {
1274                    TableFunction::Generator { name, args } => {
1275                        // Use the generator registry to create the table
1276                        use crate::sql::generators::GeneratorRegistry;
1277
1278                        // Create generator registry (could be cached in QueryEngine)
1279                        let registry = GeneratorRegistry::new();
1280
1281                        if let Some(generator) = registry.get(name) {
1282                            // Evaluate arguments
1283                            let mut evaluator = ArithmeticEvaluator::with_date_notation(
1284                                &table,
1285                                self.date_notation.clone(),
1286                            );
1287                            let dummy_row = 0;
1288
1289                            let mut evaluated_args = Vec::new();
1290                            for arg in args {
1291                                evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1292                            }
1293
1294                            // Generate the table
1295                            generator.generate(evaluated_args)?
1296                        } else {
1297                            return Err(anyhow!("Unknown generator function: {}", name));
1298                        }
1299                    }
1300                }
1301            } else {
1302                #[allow(deprecated)]
1303                if let Some(ref subquery) = statement.from_subquery {
1304                    // Execute the subquery and use its result as the source
1305                    debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1306                    let subquery_result = self.build_view_with_context(
1307                        table.clone(),
1308                        *subquery.clone(),
1309                        cte_context,
1310                    )?;
1311
1312                    // Convert the DataView to a DataTable for use as source
1313                    // This materializes the subquery result
1314                    let materialized = self.materialize_view(subquery_result)?;
1315                    Arc::new(materialized)
1316                } else {
1317                    #[allow(deprecated)]
1318                    if let Some(ref table_name) = statement.from_table {
1319                        // Check if this references a CTE
1320                        if let Some(cte_view) = cte_context.get(table_name) {
1321                            debug!(
1322                                "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1323                                table_name
1324                            );
1325                            // Materialize the CTE view as a table
1326                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1327
1328                            // Apply alias to qualified column names if present
1329                            #[allow(deprecated)]
1330                            if let Some(ref alias) = statement.from_alias {
1331                                debug!(
1332                                    "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1333                                    alias, table_name
1334                                );
1335                                for column in materialized.columns_mut() {
1336                                    // Replace the CTE name with the alias in qualified names
1337                                    if let Some(ref qualified_name) = column.qualified_name {
1338                                        if qualified_name.starts_with(&format!("{}.", table_name)) {
1339                                            column.qualified_name = Some(qualified_name.replace(
1340                                                &format!("{}.", table_name),
1341                                                &format!("{}.", alias),
1342                                            ));
1343                                        }
1344                                    }
1345                                    // Update source table to reflect the alias
1346                                    if column.source_table.as_ref() == Some(table_name) {
1347                                        column.source_table = Some(alias.clone());
1348                                    }
1349                                }
1350                            }
1351
1352                            Arc::new(materialized)
1353                        } else {
1354                            // Regular table reference - use the provided table
1355                            table.clone()
1356                        }
1357                    } else {
1358                        // No FROM clause - use the provided table
1359                        table.clone()
1360                    }
1361                }
1362            }
1363        };
1364
1365        // Register alias in execution context if present
1366        #[allow(deprecated)]
1367        if let Some(ref alias) = statement.from_alias {
1368            #[allow(deprecated)]
1369            if let Some(ref table_name) = statement.from_table {
1370                exec_context.register_alias(alias.clone(), table_name.clone());
1371            }
1372        }
1373
1374        // Process JOINs if present
1375        let final_table = if !statement.joins.is_empty() {
1376            plan.begin_step(
1377                StepType::Join,
1378                format!("Process {} JOINs", statement.joins.len()),
1379            );
1380            plan.set_rows_in(source_table.row_count());
1381
1382            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1383            let mut current_table = source_table;
1384
1385            for (idx, join_clause) in statement.joins.iter().enumerate() {
1386                let join_start = Instant::now();
1387                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1388                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1389                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1390                plan.add_detail(format!(
1391                    "Executing {:?} JOIN on {} condition(s)",
1392                    join_clause.join_type,
1393                    join_clause.condition.conditions.len()
1394                ));
1395
1396                // Resolve the right table for the join
1397                let right_table = match &join_clause.table {
1398                    TableSource::Table(name) => {
1399                        // Check if it's a CTE reference
1400                        if let Some(cte_view) = cte_context.get(name) {
1401                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1402
1403                            // Apply alias to qualified column names if present
1404                            if let Some(ref alias) = join_clause.alias {
1405                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1406                                for column in materialized.columns_mut() {
1407                                    // Replace the CTE name with the alias in qualified names
1408                                    if let Some(ref qualified_name) = column.qualified_name {
1409                                        if qualified_name.starts_with(&format!("{}.", name)) {
1410                                            column.qualified_name = Some(qualified_name.replace(
1411                                                &format!("{}.", name),
1412                                                &format!("{}.", alias),
1413                                            ));
1414                                        }
1415                                    }
1416                                    // Update source table to reflect the alias
1417                                    if column.source_table.as_ref() == Some(name) {
1418                                        column.source_table = Some(alias.clone());
1419                                    }
1420                                }
1421                            }
1422
1423                            Arc::new(materialized)
1424                        } else {
1425                            // For now, we need the actual table data
1426                            // In a real implementation, this would load from file
1427                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1428                        }
1429                    }
1430                    TableSource::DerivedTable { query, alias: _ } => {
1431                        // Execute the subquery
1432                        let subquery_result = self.build_view_with_context(
1433                            table.clone(),
1434                            *query.clone(),
1435                            cte_context,
1436                        )?;
1437                        let materialized = self.materialize_view(subquery_result)?;
1438                        Arc::new(materialized)
1439                    }
1440                    TableSource::Pivot { .. } => {
1441                        // PIVOT in JOIN is not supported yet (will be handled by transformer)
1442                        return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1443                    }
1444                };
1445
1446                // Execute the join
1447                let joined = join_executor.execute_join(
1448                    current_table.clone(),
1449                    join_clause,
1450                    right_table.clone(),
1451                )?;
1452
1453                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1454                plan.set_rows_out(joined.row_count());
1455                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1456                plan.add_detail(format!(
1457                    "Join time: {:.3}ms",
1458                    join_start.elapsed().as_secs_f64() * 1000.0
1459                ));
1460                plan.end_step();
1461
1462                current_table = Arc::new(joined);
1463            }
1464
1465            plan.set_rows_out(current_table.row_count());
1466            plan.add_detail(format!(
1467                "Final result after all joins: {} rows",
1468                current_table.row_count()
1469            ));
1470            plan.end_step();
1471            current_table
1472        } else {
1473            source_table
1474        };
1475
1476        // Continue with the existing build_view logic but using final_table
1477        self.build_view_internal_with_plan_and_exec(
1478            final_table,
1479            statement,
1480            plan,
1481            Some(exec_context),
1482        )
1483    }
1484
1485    /// Materialize a DataView into a new DataTable
1486    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1487        let source = view.source();
1488        let mut result_table = DataTable::new("derived");
1489
1490        // Get the visible columns from the view
1491        let visible_cols = view.visible_column_indices().to_vec();
1492
1493        // Copy column definitions
1494        for col_idx in &visible_cols {
1495            let col = &source.columns[*col_idx];
1496            let new_col = DataColumn {
1497                name: col.name.clone(),
1498                data_type: col.data_type.clone(),
1499                nullable: col.nullable,
1500                unique_values: col.unique_values,
1501                null_count: col.null_count,
1502                metadata: col.metadata.clone(),
1503                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1504                source_table: col.source_table.clone(),     // Preserve source table
1505            };
1506            result_table.add_column(new_col);
1507        }
1508
1509        // Copy visible rows
1510        for row_idx in view.visible_row_indices() {
1511            let source_row = &source.rows[*row_idx];
1512            let mut new_row = DataRow { values: Vec::new() };
1513
1514            for col_idx in &visible_cols {
1515                new_row.values.push(source_row.values[*col_idx].clone());
1516            }
1517
1518            result_table.add_row(new_row);
1519        }
1520
1521        Ok(result_table)
1522    }
1523
1524    fn build_view_internal(
1525        &self,
1526        table: Arc<DataTable>,
1527        statement: SelectStatement,
1528    ) -> Result<DataView> {
1529        let mut dummy_plan = ExecutionPlanBuilder::new();
1530        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1531    }
1532
1533    fn build_view_internal_with_plan(
1534        &self,
1535        table: Arc<DataTable>,
1536        statement: SelectStatement,
1537        plan: &mut ExecutionPlanBuilder,
1538    ) -> Result<DataView> {
1539        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1540    }
1541
1542    fn build_view_internal_with_plan_and_exec(
1543        &self,
1544        table: Arc<DataTable>,
1545        statement: SelectStatement,
1546        plan: &mut ExecutionPlanBuilder,
1547        exec_context: Option<&ExecutionContext>,
1548    ) -> Result<DataView> {
1549        debug!(
1550            "QueryEngine::build_view - select_items: {:?}",
1551            statement.select_items
1552        );
1553        debug!(
1554            "QueryEngine::build_view - where_clause: {:?}",
1555            statement.where_clause
1556        );
1557
1558        // Start with all rows visible
1559        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1560
1561        // Apply WHERE clause filtering using recursive evaluator
1562        if let Some(where_clause) = &statement.where_clause {
1563            let total_rows = table.row_count();
1564            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1565            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1566
1567            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1568            plan.set_rows_in(total_rows);
1569            plan.add_detail(format!("Input: {} rows", total_rows));
1570
1571            // Add details about WHERE conditions
1572            for condition in &where_clause.conditions {
1573                plan.add_detail(format!("Condition: {:?}", condition.expr));
1574            }
1575
1576            let filter_start = Instant::now();
1577            // Create an evaluation context for caching compiled regexes
1578            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1579
1580            // Create evaluator ONCE before the loop for performance
1581            let mut evaluator = if let Some(exec_ctx) = exec_context {
1582                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1583                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1584            } else {
1585                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1586            };
1587
1588            // Filter visible rows based on WHERE clause
1589            let mut filtered_rows = Vec::new();
1590            for row_idx in visible_rows {
1591                // Only log for first few rows to avoid performance impact
1592                if row_idx < 3 {
1593                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1594                }
1595
1596                match evaluator.evaluate(where_clause, row_idx) {
1597                    Ok(result) => {
1598                        if row_idx < 3 {
1599                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1600                        }
1601                        if result {
1602                            filtered_rows.push(row_idx);
1603                        }
1604                    }
1605                    Err(e) => {
1606                        if row_idx < 3 {
1607                            debug!(
1608                                "QueryEngine: WHERE evaluation error for row {}: {}",
1609                                row_idx, e
1610                            );
1611                        }
1612                        // Propagate WHERE clause errors instead of silently ignoring them
1613                        return Err(e);
1614                    }
1615                }
1616            }
1617
1618            // Log regex cache statistics
1619            let (compilations, cache_hits) = eval_context.get_stats();
1620            if compilations > 0 || cache_hits > 0 {
1621                debug!(
1622                    "LIKE pattern cache: {} compilations, {} cache hits",
1623                    compilations, cache_hits
1624                );
1625            }
1626            visible_rows = filtered_rows;
1627            let filter_duration = filter_start.elapsed();
1628            info!(
1629                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1630                total_rows,
1631                visible_rows.len(),
1632                filter_duration
1633            );
1634
1635            plan.set_rows_out(visible_rows.len());
1636            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1637            plan.add_detail(format!(
1638                "Filter time: {:.3}ms",
1639                filter_duration.as_secs_f64() * 1000.0
1640            ));
1641            plan.end_step();
1642        }
1643
1644        // Create initial DataView with filtered rows
1645        let mut view = DataView::new(table.clone());
1646        view = view.with_rows(visible_rows);
1647
1648        // Handle GROUP BY if present
1649        if let Some(group_by_exprs) = &statement.group_by {
1650            if !group_by_exprs.is_empty() {
1651                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1652
1653                plan.begin_step(
1654                    StepType::GroupBy,
1655                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1656                );
1657                plan.set_rows_in(view.row_count());
1658                plan.add_detail(format!("Input: {} rows", view.row_count()));
1659                for expr in group_by_exprs {
1660                    plan.add_detail(format!("Group by: {:?}", expr));
1661                }
1662
1663                let group_start = Instant::now();
1664                view = self.apply_group_by(
1665                    view,
1666                    group_by_exprs,
1667                    &statement.select_items,
1668                    statement.having.as_ref(),
1669                    plan,
1670                )?;
1671
1672                // Hide any columns that were promoted from HAVING (synthetic aggregates)
1673                // These have the __hidden_agg_ prefix and should not appear in output
1674                use crate::query_plan::having_alias_transformer::HIDDEN_AGG_PREFIX;
1675                let hidden_indices: Vec<usize> = view
1676                    .source()
1677                    .columns
1678                    .iter()
1679                    .enumerate()
1680                    .filter_map(|(i, c)| {
1681                        if c.name.starts_with(HIDDEN_AGG_PREFIX) {
1682                            Some(i)
1683                        } else {
1684                            None
1685                        }
1686                    })
1687                    .collect();
1688                for &idx in hidden_indices.iter().rev() {
1689                    view.hide_column(idx);
1690                }
1691
1692                plan.set_rows_out(view.row_count());
1693                plan.add_detail(format!("Output: {} groups", view.row_count()));
1694                plan.add_detail(format!(
1695                    "Overall time: {:.3}ms",
1696                    group_start.elapsed().as_secs_f64() * 1000.0
1697                ));
1698                plan.end_step();
1699            }
1700        } else {
1701            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1702            if !statement.select_items.is_empty() {
1703                // Check if we have ANY non-star items (not just the first one)
1704                let has_non_star_items = statement
1705                    .select_items
1706                    .iter()
1707                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1708
1709                // Apply select items if:
1710                // 1. We have computed expressions or explicit columns
1711                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1712                if has_non_star_items || statement.select_items.len() > 1 {
1713                    view = self.apply_select_items(
1714                        view,
1715                        &statement.select_items,
1716                        &statement,
1717                        exec_context,
1718                        plan,
1719                    )?;
1720                }
1721                // If it's just a single star, no projection needed
1722            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1723                debug!("QueryEngine: Using legacy columns path");
1724                // Fallback to legacy column projection for backward compatibility
1725                // Use the current view's source table, not the original table
1726                let source_table = view.source();
1727                let column_indices =
1728                    self.resolve_column_indices(source_table, &statement.columns)?;
1729                view = view.with_columns(column_indices);
1730            }
1731        }
1732
1733        // Apply DISTINCT if specified
1734        if statement.distinct {
1735            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1736            plan.set_rows_in(view.row_count());
1737            plan.add_detail(format!("Input: {} rows", view.row_count()));
1738
1739            let distinct_start = Instant::now();
1740            view = self.apply_distinct(view)?;
1741
1742            plan.set_rows_out(view.row_count());
1743            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1744            plan.add_detail(format!(
1745                "Distinct time: {:.3}ms",
1746                distinct_start.elapsed().as_secs_f64() * 1000.0
1747            ));
1748            plan.end_step();
1749        }
1750
1751        // Apply ORDER BY sorting
1752        if let Some(order_by_columns) = &statement.order_by {
1753            if !order_by_columns.is_empty() {
1754                plan.begin_step(
1755                    StepType::Sort,
1756                    format!("ORDER BY {} columns", order_by_columns.len()),
1757                );
1758                plan.set_rows_in(view.row_count());
1759                for col in order_by_columns {
1760                    // Format the expression (simplified for now - just show column name or "expr")
1761                    let expr_str = match &col.expr {
1762                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1763                        _ => "expr".to_string(),
1764                    };
1765                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1766                }
1767
1768                let sort_start = Instant::now();
1769                view =
1770                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1771
1772                plan.add_detail(format!(
1773                    "Sort time: {:.3}ms",
1774                    sort_start.elapsed().as_secs_f64() * 1000.0
1775                ));
1776                plan.end_step();
1777            }
1778        }
1779
1780        // Apply LIMIT/OFFSET
1781        if let Some(limit) = statement.limit {
1782            let offset = statement.offset.unwrap_or(0);
1783            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1784            plan.set_rows_in(view.row_count());
1785            if offset > 0 {
1786                plan.add_detail(format!("OFFSET: {}", offset));
1787            }
1788            view = view.with_limit(limit, offset);
1789            plan.set_rows_out(view.row_count());
1790            plan.add_detail(format!("Output: {} rows", view.row_count()));
1791            plan.end_step();
1792        }
1793
1794        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1795        if !statement.set_operations.is_empty() {
1796            plan.begin_step(
1797                StepType::SetOperation,
1798                format!("Process {} set operations", statement.set_operations.len()),
1799            );
1800            plan.set_rows_in(view.row_count());
1801
1802            // Materialize the first result set
1803            let mut combined_table = self.materialize_view(view)?;
1804            let first_columns = combined_table.column_names();
1805            let first_column_count = first_columns.len();
1806
1807            // Track if any operation requires deduplication
1808            let mut needs_deduplication = false;
1809
1810            // Process each set operation
1811            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1812                let op_start = Instant::now();
1813                plan.begin_step(
1814                    StepType::SetOperation,
1815                    format!("{:?} operation #{}", operation, idx + 1),
1816                );
1817
1818                // Execute the next SELECT statement
1819                // We need to pass the original table and exec_context for proper resolution
1820                let next_view = if let Some(exec_ctx) = exec_context {
1821                    self.build_view_internal_with_plan_and_exec(
1822                        table.clone(),
1823                        *next_statement.clone(),
1824                        plan,
1825                        Some(exec_ctx),
1826                    )?
1827                } else {
1828                    self.build_view_internal_with_plan(
1829                        table.clone(),
1830                        *next_statement.clone(),
1831                        plan,
1832                    )?
1833                };
1834
1835                // Materialize the next result set
1836                let next_table = self.materialize_view(next_view)?;
1837                let next_columns = next_table.column_names();
1838                let next_column_count = next_columns.len();
1839
1840                // Validate schema compatibility
1841                if first_column_count != next_column_count {
1842                    return Err(anyhow!(
1843                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1844                        first_column_count,
1845                        idx + 2,
1846                        next_column_count
1847                    ));
1848                }
1849
1850                // Warn if column names don't match (but allow it - some SQL dialects do)
1851                for (col_idx, (first_col, next_col)) in
1852                    first_columns.iter().zip(next_columns.iter()).enumerate()
1853                {
1854                    if !first_col.eq_ignore_ascii_case(next_col) {
1855                        debug!(
1856                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1857                            col_idx + 1,
1858                            first_col,
1859                            next_col
1860                        );
1861                    }
1862                }
1863
1864                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1865                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1866
1867                // Perform the set operation
1868                match operation {
1869                    SetOperation::UnionAll => {
1870                        // UNION ALL: Simply concatenate all rows without deduplication
1871                        for row in next_table.rows.iter() {
1872                            combined_table.add_row(row.clone());
1873                        }
1874                        plan.add_detail(format!(
1875                            "Result: {} rows (no deduplication)",
1876                            combined_table.row_count()
1877                        ));
1878                    }
1879                    SetOperation::Union => {
1880                        // UNION: Concatenate all rows first, deduplicate at the end
1881                        for row in next_table.rows.iter() {
1882                            combined_table.add_row(row.clone());
1883                        }
1884                        needs_deduplication = true;
1885                        plan.add_detail(format!(
1886                            "Combined: {} rows (deduplication pending)",
1887                            combined_table.row_count()
1888                        ));
1889                    }
1890                    SetOperation::Intersect => {
1891                        // INTERSECT: Keep only rows that appear in both
1892                        // TODO: Implement intersection logic
1893                        return Err(anyhow!("INTERSECT is not yet implemented"));
1894                    }
1895                    SetOperation::Except => {
1896                        // EXCEPT: Keep only rows from left that don't appear in right
1897                        // TODO: Implement except logic
1898                        return Err(anyhow!("EXCEPT is not yet implemented"));
1899                    }
1900                }
1901
1902                plan.add_detail(format!(
1903                    "Operation time: {:.3}ms",
1904                    op_start.elapsed().as_secs_f64() * 1000.0
1905                ));
1906                plan.set_rows_out(combined_table.row_count());
1907                plan.end_step();
1908            }
1909
1910            plan.set_rows_out(combined_table.row_count());
1911            plan.add_detail(format!(
1912                "Combined result: {} rows after {} operations",
1913                combined_table.row_count(),
1914                statement.set_operations.len()
1915            ));
1916            plan.end_step();
1917
1918            // Create a new view from the combined table
1919            view = DataView::new(Arc::new(combined_table));
1920
1921            // Apply deduplication if any UNION (not UNION ALL) operation was used
1922            if needs_deduplication {
1923                plan.begin_step(
1924                    StepType::Distinct,
1925                    "UNION deduplication - remove duplicate rows".to_string(),
1926                );
1927                plan.set_rows_in(view.row_count());
1928                plan.add_detail(format!("Input: {} rows", view.row_count()));
1929
1930                let distinct_start = Instant::now();
1931                view = self.apply_distinct(view)?;
1932
1933                plan.set_rows_out(view.row_count());
1934                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1935                plan.add_detail(format!(
1936                    "Deduplication time: {:.3}ms",
1937                    distinct_start.elapsed().as_secs_f64() * 1000.0
1938                ));
1939                plan.end_step();
1940            }
1941        }
1942
1943        Ok(view)
1944    }
1945
1946    /// Resolve column names to indices
1947    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1948        let mut indices = Vec::new();
1949        let table_columns = table.column_names();
1950
1951        for col_name in columns {
1952            let index = table_columns
1953                .iter()
1954                .position(|c| c.eq_ignore_ascii_case(col_name))
1955                .ok_or_else(|| {
1956                    let suggestion = self.find_similar_column(table, col_name);
1957                    match suggestion {
1958                        Some(similar) => anyhow::anyhow!(
1959                            "Column '{}' not found. Did you mean '{}'?",
1960                            col_name,
1961                            similar
1962                        ),
1963                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1964                    }
1965                })?;
1966            indices.push(index);
1967        }
1968
1969        Ok(indices)
1970    }
1971
1972    /// Apply SELECT items (columns and computed expressions) to create new view
1973    fn apply_select_items(
1974        &self,
1975        view: DataView,
1976        select_items: &[SelectItem],
1977        _statement: &SelectStatement,
1978        exec_context: Option<&ExecutionContext>,
1979        plan: &mut ExecutionPlanBuilder,
1980    ) -> Result<DataView> {
1981        debug!(
1982            "QueryEngine::apply_select_items - items: {:?}",
1983            select_items
1984        );
1985        debug!(
1986            "QueryEngine::apply_select_items - input view has {} rows",
1987            view.row_count()
1988        );
1989
1990        // Check if any select items contain window functions
1991        let has_window_functions = select_items.iter().any(|item| match item {
1992            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1993            _ => false,
1994        });
1995
1996        // Count window functions for detailed reporting
1997        let window_func_count: usize = select_items
1998            .iter()
1999            .filter(|item| match item {
2000                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
2001                _ => false,
2002            })
2003            .count();
2004
2005        // Start timing for window function evaluation if present
2006        let window_start = if has_window_functions {
2007            debug!(
2008                "QueryEngine::apply_select_items - detected {} window functions",
2009                window_func_count
2010            );
2011
2012            // Extract window specs (Step 2: parallel path, not used yet)
2013            let window_specs = Self::extract_window_specs(select_items);
2014            debug!("Extracted {} window function specs", window_specs.len());
2015
2016            Some(Instant::now())
2017        } else {
2018            None
2019        };
2020
2021        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
2022        let has_unnest = select_items.iter().any(|item| match item {
2023            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2024            _ => false,
2025        });
2026
2027        if has_unnest {
2028            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2029            return self.apply_select_with_row_expansion(view, select_items);
2030        }
2031
2032        // Check if this is an aggregate query:
2033        // 1. At least one aggregate function exists
2034        // 2. All other items are either aggregates or constants (aggregate-compatible)
2035        let has_aggregates = select_items.iter().any(|item| match item {
2036            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2037            SelectItem::Column { .. } => false,
2038            SelectItem::Star { .. } => false,
2039            SelectItem::StarExclude { .. } => false,
2040        });
2041
2042        let all_aggregate_compatible = select_items.iter().all(|item| match item {
2043            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2044            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
2045            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
2046            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
2047        });
2048
2049        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2050            // Special handling for aggregate queries with constants (no GROUP BY)
2051            // These should produce exactly one row
2052            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2053            return self.apply_aggregate_select(view, select_items);
2054        }
2055
2056        // Check if we need to create computed columns
2057        let has_computed_expressions = select_items
2058            .iter()
2059            .any(|item| matches!(item, SelectItem::Expression { .. }));
2060
2061        debug!(
2062            "QueryEngine::apply_select_items - has_computed_expressions: {}",
2063            has_computed_expressions
2064        );
2065
2066        if !has_computed_expressions {
2067            // Simple case: only columns, use existing projection logic
2068            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2069            return Ok(view.with_columns(column_indices));
2070        }
2071
2072        // Complex case: we have computed expressions
2073        // IMPORTANT: We create a PROJECTED view, not a new table
2074        // This preserves the original DataTable reference
2075
2076        let source_table = view.source();
2077        let visible_rows = view.visible_row_indices();
2078
2079        // Create a temporary table just for the computed result view
2080        // But this table is only used for the current query result
2081        let mut computed_table = DataTable::new("query_result");
2082
2083        // First, expand any Star selectors to actual columns
2084        let mut expanded_items = Vec::new();
2085        for item in select_items {
2086            match item {
2087                SelectItem::Star { table_prefix, .. } => {
2088                    if let Some(prefix) = table_prefix {
2089                        // Scoped expansion: table.* expands only columns from that table
2090                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2091                        for col in &source_table.columns {
2092                            if Self::column_matches_table(col, prefix) {
2093                                expanded_items.push(SelectItem::Column {
2094                                    column: ColumnRef::unquoted(col.name.clone()),
2095                                    leading_comments: vec![],
2096                                    trailing_comment: None,
2097                                });
2098                            }
2099                        }
2100                    } else {
2101                        // Unscoped expansion: * expands to all columns
2102                        debug!("QueryEngine::apply_select_items - expanding *");
2103                        for col_name in source_table.column_names() {
2104                            expanded_items.push(SelectItem::Column {
2105                                column: ColumnRef::unquoted(col_name.to_string()),
2106                                leading_comments: vec![],
2107                                trailing_comment: None,
2108                            });
2109                        }
2110                    }
2111                }
2112                _ => expanded_items.push(item.clone()),
2113            }
2114        }
2115
2116        // Add columns based on expanded SelectItems, handling duplicates
2117        let mut column_name_counts: std::collections::HashMap<String, usize> =
2118            std::collections::HashMap::new();
2119
2120        for item in &expanded_items {
2121            let base_name = match item {
2122                SelectItem::Column {
2123                    column: col_ref, ..
2124                } => col_ref.name.clone(),
2125                SelectItem::Expression { alias, .. } => alias.clone(),
2126                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2127                SelectItem::StarExclude { .. } => {
2128                    unreachable!("StarExclude should have been expanded")
2129                }
2130            };
2131
2132            // Check if this column name has been used before
2133            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2134            let column_name = if *count == 0 {
2135                // First occurrence, use the name as-is
2136                base_name.clone()
2137            } else {
2138                // Duplicate, append a suffix
2139                format!("{base_name}_{count}")
2140            };
2141            *count += 1;
2142
2143            computed_table.add_column(DataColumn::new(&column_name));
2144        }
2145
2146        // Check if batch evaluation can be used
2147        // Batch evaluation is the default but we need to check if all window functions
2148        // are standalone (not embedded in expressions)
2149        let can_use_batch = expanded_items.iter().all(|item| {
2150            match item {
2151                SelectItem::Expression { expr, .. } => {
2152                    // Only use batch evaluation if the expression IS a window function,
2153                    // not if it CONTAINS a window function
2154                    matches!(expr, SqlExpression::WindowFunction { .. })
2155                        || !Self::contains_window_function(expr)
2156                }
2157                _ => true, // Non-expressions are fine
2158            }
2159        });
2160
2161        // Batch evaluation is now the default mode for improved performance
2162        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
2163        let use_batch_evaluation = can_use_batch
2164            && std::env::var("SQL_CLI_BATCH_WINDOW")
2165                .map(|v| v != "0" && v.to_lowercase() != "false")
2166                .unwrap_or(true);
2167
2168        // Store window specs for batch evaluation if needed
2169        let batch_window_specs = if use_batch_evaluation && has_window_functions {
2170            debug!("BATCH window function evaluation flag is enabled");
2171            // Extract window specs before timing starts
2172            let specs = Self::extract_window_specs(&expanded_items);
2173            debug!(
2174                "Extracted {} window function specs for batch evaluation",
2175                specs.len()
2176            );
2177            Some(specs)
2178        } else {
2179            None
2180        };
2181
2182        // Calculate values for each row
2183        let mut evaluator =
2184            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2185
2186        // Populate table aliases from exec_context if available
2187        if let Some(exec_ctx) = exec_context {
2188            let aliases = exec_ctx.get_aliases();
2189            if !aliases.is_empty() {
2190                debug!(
2191                    "Applying {} aliases to evaluator: {:?}",
2192                    aliases.len(),
2193                    aliases
2194                );
2195                evaluator = evaluator.with_table_aliases(aliases);
2196            }
2197        }
2198
2199        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2200        // This avoids 50,000+ redundant context lookups
2201        if has_window_functions {
2202            let preload_start = Instant::now();
2203
2204            // Extract all unique WindowSpecs from SELECT items
2205            let mut window_specs = Vec::new();
2206            for item in &expanded_items {
2207                if let SelectItem::Expression { expr, .. } = item {
2208                    Self::collect_window_specs(expr, &mut window_specs);
2209                }
2210            }
2211
2212            // Pre-create all WindowContexts
2213            for spec in &window_specs {
2214                let _ = evaluator.get_or_create_window_context(spec);
2215            }
2216
2217            debug!(
2218                "Pre-created {} WindowContext(s) in {:.2}ms",
2219                window_specs.len(),
2220                preload_start.elapsed().as_secs_f64() * 1000.0
2221            );
2222        }
2223
2224        // Batch evaluation path for window functions
2225        if let Some(window_specs) = batch_window_specs {
2226            debug!("Starting batch window function evaluation");
2227            let batch_start = Instant::now();
2228
2229            // Initialize result table with all rows
2230            let mut batch_results: Vec<Vec<DataValue>> =
2231                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2232
2233            // Use the window specs we extracted earlier
2234            let detailed_window_specs = &window_specs;
2235
2236            // Group window specs by their WindowSpec for batch processing
2237            let mut specs_by_window: HashMap<
2238                u64,
2239                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2240            > = HashMap::new();
2241            for spec in detailed_window_specs {
2242                let hash = spec.spec.compute_hash();
2243                specs_by_window
2244                    .entry(hash)
2245                    .or_insert_with(Vec::new)
2246                    .push(spec);
2247            }
2248
2249            // Process each unique window specification
2250            for (_window_hash, specs) in specs_by_window {
2251                // Get the window context (already pre-created)
2252                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2253
2254                // Process each function using this window
2255                for spec in specs {
2256                    match spec.function_name.as_str() {
2257                        "LAG" => {
2258                            // Extract column and offset from arguments
2259                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2260                                let column_name = col_ref.name.as_str();
2261                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2262                                    spec.args.get(1)
2263                                {
2264                                    n.parse::<i64>().unwrap_or(1)
2265                                } else {
2266                                    1 // default offset
2267                                };
2268
2269                                let values = context.evaluate_lag_batch(
2270                                    visible_rows,
2271                                    column_name,
2272                                    offset,
2273                                )?;
2274
2275                                // Write results to the output column
2276                                for (row_idx, value) in values.into_iter().enumerate() {
2277                                    batch_results[row_idx][spec.output_column_index] = value;
2278                                }
2279                            }
2280                        }
2281                        "LEAD" => {
2282                            // Extract column and offset from arguments
2283                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2284                                let column_name = col_ref.name.as_str();
2285                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2286                                    spec.args.get(1)
2287                                {
2288                                    n.parse::<i64>().unwrap_or(1)
2289                                } else {
2290                                    1 // default offset
2291                                };
2292
2293                                let values = context.evaluate_lead_batch(
2294                                    visible_rows,
2295                                    column_name,
2296                                    offset,
2297                                )?;
2298
2299                                // Write results to the output column
2300                                for (row_idx, value) in values.into_iter().enumerate() {
2301                                    batch_results[row_idx][spec.output_column_index] = value;
2302                                }
2303                            }
2304                        }
2305                        "ROW_NUMBER" => {
2306                            let values = context.evaluate_row_number_batch(visible_rows)?;
2307
2308                            // Write results to the output column
2309                            for (row_idx, value) in values.into_iter().enumerate() {
2310                                batch_results[row_idx][spec.output_column_index] = value;
2311                            }
2312                        }
2313                        "RANK" => {
2314                            let values = context.evaluate_rank_batch(visible_rows)?;
2315
2316                            // Write results to the output column
2317                            for (row_idx, value) in values.into_iter().enumerate() {
2318                                batch_results[row_idx][spec.output_column_index] = value;
2319                            }
2320                        }
2321                        "DENSE_RANK" => {
2322                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2323
2324                            // Write results to the output column
2325                            for (row_idx, value) in values.into_iter().enumerate() {
2326                                batch_results[row_idx][spec.output_column_index] = value;
2327                            }
2328                        }
2329                        "SUM" => {
2330                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2331                                let column_name = col_ref.name.as_str();
2332                                let values =
2333                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2334
2335                                for (row_idx, value) in values.into_iter().enumerate() {
2336                                    batch_results[row_idx][spec.output_column_index] = value;
2337                                }
2338                            }
2339                        }
2340                        "AVG" => {
2341                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2342                                let column_name = col_ref.name.as_str();
2343                                let values =
2344                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2345
2346                                for (row_idx, value) in values.into_iter().enumerate() {
2347                                    batch_results[row_idx][spec.output_column_index] = value;
2348                                }
2349                            }
2350                        }
2351                        "MIN" => {
2352                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2353                                let column_name = col_ref.name.as_str();
2354                                let values =
2355                                    context.evaluate_min_batch(visible_rows, column_name)?;
2356
2357                                for (row_idx, value) in values.into_iter().enumerate() {
2358                                    batch_results[row_idx][spec.output_column_index] = value;
2359                                }
2360                            }
2361                        }
2362                        "MAX" => {
2363                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2364                                let column_name = col_ref.name.as_str();
2365                                let values =
2366                                    context.evaluate_max_batch(visible_rows, column_name)?;
2367
2368                                for (row_idx, value) in values.into_iter().enumerate() {
2369                                    batch_results[row_idx][spec.output_column_index] = value;
2370                                }
2371                            }
2372                        }
2373                        "COUNT" => {
2374                            // COUNT can be COUNT(*) or COUNT(column)
2375                            let column_name = match spec.args.get(0) {
2376                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2377                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2378                                _ => None,
2379                            };
2380
2381                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2382
2383                            for (row_idx, value) in values.into_iter().enumerate() {
2384                                batch_results[row_idx][spec.output_column_index] = value;
2385                            }
2386                        }
2387                        "FIRST_VALUE" => {
2388                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2389                                let column_name = col_ref.name.as_str();
2390                                let values = context
2391                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2392
2393                                for (row_idx, value) in values.into_iter().enumerate() {
2394                                    batch_results[row_idx][spec.output_column_index] = value;
2395                                }
2396                            }
2397                        }
2398                        "LAST_VALUE" => {
2399                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2400                                let column_name = col_ref.name.as_str();
2401                                let values =
2402                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2403
2404                                for (row_idx, value) in values.into_iter().enumerate() {
2405                                    batch_results[row_idx][spec.output_column_index] = value;
2406                                }
2407                            }
2408                        }
2409                        _ => {
2410                            // Fall back to per-row evaluation for unsupported functions
2411                            debug!(
2412                                "Window function {} not supported in batch mode, using per-row",
2413                                spec.function_name
2414                            );
2415                        }
2416                    }
2417                }
2418            }
2419
2420            // Now evaluate non-window columns
2421            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2422                for (col_idx, item) in expanded_items.iter().enumerate() {
2423                    // Skip if this column was already filled by a window function
2424                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2425                        continue;
2426                    }
2427
2428                    let value = match item {
2429                        SelectItem::Column {
2430                            column: col_ref, ..
2431                        } => {
2432                            match evaluator
2433                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2434                            {
2435                                Ok(val) => val,
2436                                Err(e) => {
2437                                    return Err(anyhow!(
2438                                        "Failed to evaluate column {}: {}",
2439                                        col_ref.to_sql(),
2440                                        e
2441                                    ));
2442                                }
2443                            }
2444                        }
2445                        SelectItem::Expression { expr, .. } => {
2446                            // For batch evaluation, we need to handle expressions differently
2447                            // If this is just a window function by itself, we already computed it
2448                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2449                                // Pure window function - already handled in batch evaluation
2450                                continue;
2451                            }
2452                            // For expressions containing window functions or regular expressions,
2453                            // evaluate them normally
2454                            evaluator.evaluate(&expr, source_row_idx)?
2455                        }
2456                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2457                        SelectItem::StarExclude { .. } => {
2458                            unreachable!("StarExclude should have been expanded")
2459                        }
2460                    };
2461                    batch_results[result_row_idx][col_idx] = value;
2462                }
2463            }
2464
2465            // Add all rows to the table
2466            for row_values in batch_results {
2467                computed_table
2468                    .add_row(DataRow::new(row_values))
2469                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2470            }
2471
2472            debug!(
2473                "Batch window evaluation completed in {:.3}ms",
2474                batch_start.elapsed().as_secs_f64() * 1000.0
2475            );
2476        } else {
2477            // Original per-row evaluation path
2478            for &row_idx in visible_rows {
2479                let mut row_values = Vec::new();
2480
2481                for item in &expanded_items {
2482                    let value = match item {
2483                        SelectItem::Column {
2484                            column: col_ref, ..
2485                        } => {
2486                            // Use evaluator for column resolution (handles aliases properly)
2487                            match evaluator
2488                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2489                            {
2490                                Ok(val) => val,
2491                                Err(e) => {
2492                                    return Err(anyhow!(
2493                                        "Failed to evaluate column {}: {}",
2494                                        col_ref.to_sql(),
2495                                        e
2496                                    ));
2497                                }
2498                            }
2499                        }
2500                        SelectItem::Expression { expr, .. } => {
2501                            // Computed expression
2502                            evaluator.evaluate(&expr, row_idx)?
2503                        }
2504                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2505                        SelectItem::StarExclude { .. } => {
2506                            unreachable!("StarExclude should have been expanded")
2507                        }
2508                    };
2509                    row_values.push(value);
2510                }
2511
2512                computed_table
2513                    .add_row(DataRow::new(row_values))
2514                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2515            }
2516        }
2517
2518        // Log window function timing if applicable
2519        if let Some(start) = window_start {
2520            let window_duration = start.elapsed();
2521            info!(
2522                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2523                window_duration.as_secs_f64() * 1000.0,
2524                visible_rows.len(),
2525                window_func_count
2526            );
2527
2528            // Add to execution plan
2529            plan.begin_step(
2530                StepType::WindowFunction,
2531                format!("Evaluate {} window function(s)", window_func_count),
2532            );
2533            plan.set_rows_in(visible_rows.len());
2534            plan.set_rows_out(visible_rows.len());
2535            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2536            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2537            plan.add_detail(format!(
2538                "Evaluation time: {:.3}ms",
2539                window_duration.as_secs_f64() * 1000.0
2540            ));
2541            plan.end_step();
2542        }
2543
2544        // Return a view of the computed result
2545        // This is a temporary view for this query only
2546        Ok(DataView::new(Arc::new(computed_table)))
2547    }
2548
2549    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2550    fn apply_select_with_row_expansion(
2551        &self,
2552        view: DataView,
2553        select_items: &[SelectItem],
2554    ) -> Result<DataView> {
2555        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2556
2557        let source_table = view.source();
2558        let visible_rows = view.visible_row_indices();
2559        let expander_registry = RowExpanderRegistry::new();
2560
2561        // Create result table
2562        let mut result_table = DataTable::new("unnest_result");
2563
2564        // Expand * to columns and set up result columns
2565        let mut expanded_items = Vec::new();
2566        for item in select_items {
2567            match item {
2568                SelectItem::Star { table_prefix, .. } => {
2569                    if let Some(prefix) = table_prefix {
2570                        // Scoped expansion: table.* expands only columns from that table
2571                        debug!(
2572                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2573                            prefix
2574                        );
2575                        for col in &source_table.columns {
2576                            if Self::column_matches_table(col, prefix) {
2577                                expanded_items.push(SelectItem::Column {
2578                                    column: ColumnRef::unquoted(col.name.clone()),
2579                                    leading_comments: vec![],
2580                                    trailing_comment: None,
2581                                });
2582                            }
2583                        }
2584                    } else {
2585                        // Unscoped expansion: * expands to all columns
2586                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2587                        for col_name in source_table.column_names() {
2588                            expanded_items.push(SelectItem::Column {
2589                                column: ColumnRef::unquoted(col_name.to_string()),
2590                                leading_comments: vec![],
2591                                trailing_comment: None,
2592                            });
2593                        }
2594                    }
2595                }
2596                _ => expanded_items.push(item.clone()),
2597            }
2598        }
2599
2600        // Add columns to result table
2601        for item in &expanded_items {
2602            let column_name = match item {
2603                SelectItem::Column {
2604                    column: col_ref, ..
2605                } => col_ref.name.clone(),
2606                SelectItem::Expression { alias, .. } => alias.clone(),
2607                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2608                SelectItem::StarExclude { .. } => {
2609                    unreachable!("StarExclude should have been expanded")
2610                }
2611            };
2612            result_table.add_column(DataColumn::new(&column_name));
2613        }
2614
2615        // Process each input row
2616        let mut evaluator =
2617            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2618
2619        for &row_idx in visible_rows {
2620            // First pass: identify UNNEST expressions and collect their expansion arrays
2621            let mut unnest_expansions = Vec::new();
2622            let mut unnest_indices = Vec::new();
2623
2624            for (col_idx, item) in expanded_items.iter().enumerate() {
2625                if let SelectItem::Expression { expr, .. } = item {
2626                    if let Some(expansion_result) = self.try_expand_unnest(
2627                        &expr,
2628                        source_table,
2629                        row_idx,
2630                        &mut evaluator,
2631                        &expander_registry,
2632                    )? {
2633                        unnest_expansions.push(expansion_result);
2634                        unnest_indices.push(col_idx);
2635                    }
2636                }
2637            }
2638
2639            // Determine how many output rows to generate
2640            let expansion_count = if unnest_expansions.is_empty() {
2641                1 // No UNNEST, just one row
2642            } else {
2643                unnest_expansions
2644                    .iter()
2645                    .map(|exp| exp.row_count())
2646                    .max()
2647                    .unwrap_or(1)
2648            };
2649
2650            // Generate output rows
2651            for output_idx in 0..expansion_count {
2652                let mut row_values = Vec::new();
2653
2654                for (col_idx, item) in expanded_items.iter().enumerate() {
2655                    // Check if this column is an UNNEST column
2656                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2657
2658                    let value = if let Some(unnest_idx) = unnest_position {
2659                        // Get value from expansion array (or NULL if exhausted)
2660                        let expansion = &unnest_expansions[unnest_idx];
2661                        expansion
2662                            .values
2663                            .get(output_idx)
2664                            .cloned()
2665                            .unwrap_or(DataValue::Null)
2666                    } else {
2667                        // Regular column or non-UNNEST expression - replicate from input
2668                        match item {
2669                            SelectItem::Column {
2670                                column: col_ref, ..
2671                            } => {
2672                                let col_idx =
2673                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2674                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2675                                    )?;
2676                                let row = source_table
2677                                    .get_row(row_idx)
2678                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2679                                row.get(col_idx)
2680                                    .ok_or_else(|| {
2681                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2682                                    })?
2683                                    .clone()
2684                            }
2685                            SelectItem::Expression { expr, .. } => {
2686                                // Non-UNNEST expression - evaluate once and replicate
2687                                evaluator.evaluate(&expr, row_idx)?
2688                            }
2689                            SelectItem::Star { .. } => unreachable!(),
2690                            SelectItem::StarExclude { .. } => {
2691                                unreachable!("StarExclude should have been expanded")
2692                            }
2693                        }
2694                    };
2695
2696                    row_values.push(value);
2697                }
2698
2699                result_table
2700                    .add_row(DataRow::new(row_values))
2701                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2702            }
2703        }
2704
2705        debug!(
2706            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2707            visible_rows.len(),
2708            result_table.row_count()
2709        );
2710
2711        Ok(DataView::new(Arc::new(result_table)))
2712    }
2713
2714    /// Try to expand an expression if it's an UNNEST call
2715    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2716    fn try_expand_unnest(
2717        &self,
2718        expr: &SqlExpression,
2719        _source_table: &DataTable,
2720        row_idx: usize,
2721        evaluator: &mut ArithmeticEvaluator,
2722        expander_registry: &RowExpanderRegistry,
2723    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2724        // Check for UNNEST variant (direct syntax)
2725        if let SqlExpression::Unnest { column, delimiter } = expr {
2726            // Evaluate the column expression
2727            let column_value = evaluator.evaluate(column, row_idx)?;
2728
2729            // Delimiter is already a string literal
2730            let delimiter_value = DataValue::String(delimiter.clone());
2731
2732            // Get the UNNEST expander
2733            let expander = expander_registry
2734                .get("UNNEST")
2735                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2736
2737            // Expand the value
2738            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2739            return Ok(Some(expansion));
2740        }
2741
2742        // Also check for FunctionCall form (for compatibility)
2743        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2744            if name.to_uppercase() == "UNNEST" {
2745                // UNNEST(column, delimiter)
2746                if args.len() != 2 {
2747                    return Err(anyhow::anyhow!(
2748                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2749                    ));
2750                }
2751
2752                // Evaluate the column expression (first arg)
2753                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2754
2755                // Evaluate the delimiter expression (second arg)
2756                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2757
2758                // Get the UNNEST expander
2759                let expander = expander_registry
2760                    .get("UNNEST")
2761                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2762
2763                // Expand the value
2764                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2765                return Ok(Some(expansion));
2766            }
2767        }
2768
2769        Ok(None)
2770    }
2771
2772    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2773    fn apply_aggregate_select(
2774        &self,
2775        view: DataView,
2776        select_items: &[SelectItem],
2777    ) -> Result<DataView> {
2778        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2779
2780        let source_table = view.source();
2781        let mut result_table = DataTable::new("aggregate_result");
2782
2783        // Add columns for each select item
2784        for item in select_items {
2785            let column_name = match item {
2786                SelectItem::Expression { alias, .. } => alias.clone(),
2787                _ => unreachable!("Should only have expressions in aggregate-only query"),
2788            };
2789            result_table.add_column(DataColumn::new(&column_name));
2790        }
2791
2792        // Create evaluator with visible rows from the view (for filtered aggregates)
2793        let visible_rows = view.visible_row_indices().to_vec();
2794        let mut evaluator =
2795            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2796                .with_visible_rows(visible_rows);
2797
2798        // Evaluate each aggregate expression once (they handle all rows internally)
2799        let mut row_values = Vec::new();
2800        for item in select_items {
2801            match item {
2802                SelectItem::Expression { expr, .. } => {
2803                    // The evaluator will handle aggregates over all rows
2804                    // We pass row_index=0 but aggregates ignore it and process all rows
2805                    let value = evaluator.evaluate(expr, 0)?;
2806                    row_values.push(value);
2807                }
2808                _ => unreachable!("Should only have expressions in aggregate-only query"),
2809            }
2810        }
2811
2812        // Add the single result row
2813        result_table
2814            .add_row(DataRow::new(row_values))
2815            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2816
2817        Ok(DataView::new(Arc::new(result_table)))
2818    }
2819
2820    /// Check if a column belongs to a specific table based on source_table or qualified_name
2821    ///
2822    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2823    /// to filter which columns should be included.
2824    ///
2825    /// # Arguments
2826    /// * `col` - The column to check
2827    /// * `table_name` - The table name or alias to match against
2828    ///
2829    /// # Returns
2830    /// `true` if the column belongs to the specified table
2831    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2832        // First, check the source_table field
2833        if let Some(ref source) = col.source_table {
2834            // Direct match or matches with schema qualification
2835            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2836                return true;
2837            }
2838        }
2839
2840        // Second, check the qualified_name field
2841        if let Some(ref qualified) = col.qualified_name {
2842            // Check if qualified name starts with "table_name."
2843            if qualified.starts_with(&format!("{}.", table_name)) {
2844                return true;
2845            }
2846        }
2847
2848        false
2849    }
2850
2851    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2852    fn resolve_select_columns(
2853        &self,
2854        table: &DataTable,
2855        select_items: &[SelectItem],
2856    ) -> Result<Vec<usize>> {
2857        let mut indices = Vec::new();
2858        let table_columns = table.column_names();
2859
2860        for item in select_items {
2861            match item {
2862                SelectItem::Column {
2863                    column: col_ref, ..
2864                } => {
2865                    // Check if this has a table prefix
2866                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2867                        // For qualified references, ONLY try qualified lookup - no fallback
2868                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2869                        table.find_column_by_qualified_name(&qualified_name)
2870                            .ok_or_else(|| {
2871                                // Check if any columns have qualified names for better error message
2872                                let has_qualified = table.columns.iter()
2873                                    .any(|c| c.qualified_name.is_some());
2874                                if !has_qualified {
2875                                    anyhow::anyhow!(
2876                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2877                                        qualified_name, table_prefix
2878                                    )
2879                                } else {
2880                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2881                                }
2882                            })?
2883                    } else {
2884                        // Simple column name lookup
2885                        table_columns
2886                            .iter()
2887                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2888                            .ok_or_else(|| {
2889                                let suggestion = self.find_similar_column(table, &col_ref.name);
2890                                match suggestion {
2891                                    Some(similar) => anyhow::anyhow!(
2892                                        "Column '{}' not found. Did you mean '{}'?",
2893                                        col_ref.name,
2894                                        similar
2895                                    ),
2896                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2897                                }
2898                            })?
2899                    };
2900                    indices.push(index);
2901                }
2902                SelectItem::Star { table_prefix, .. } => {
2903                    if let Some(prefix) = table_prefix {
2904                        // Scoped expansion: table.* expands only columns from that table
2905                        for (i, col) in table.columns.iter().enumerate() {
2906                            if Self::column_matches_table(col, prefix) {
2907                                indices.push(i);
2908                            }
2909                        }
2910                    } else {
2911                        // Unscoped expansion: * expands to all column indices
2912                        for i in 0..table_columns.len() {
2913                            indices.push(i);
2914                        }
2915                    }
2916                }
2917                SelectItem::StarExclude {
2918                    table_prefix,
2919                    excluded_columns,
2920                    ..
2921                } => {
2922                    // Expand all columns (with optional table prefix), then exclude specified ones
2923                    if let Some(prefix) = table_prefix {
2924                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2925                        for (i, col) in table.columns.iter().enumerate() {
2926                            if Self::column_matches_table(col, prefix)
2927                                && !excluded_columns.contains(&col.name)
2928                            {
2929                                indices.push(i);
2930                            }
2931                        }
2932                    } else {
2933                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2934                        for (i, col_name) in table_columns.iter().enumerate() {
2935                            if !excluded_columns
2936                                .iter()
2937                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2938                            {
2939                                indices.push(i);
2940                            }
2941                        }
2942                    }
2943                }
2944                SelectItem::Expression { .. } => {
2945                    return Err(anyhow::anyhow!(
2946                        "Computed expressions require new table creation"
2947                    ));
2948                }
2949            }
2950        }
2951
2952        Ok(indices)
2953    }
2954
2955    /// Apply DISTINCT to remove duplicate rows
2956    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2957        use std::collections::HashSet;
2958
2959        let source = view.source();
2960        let visible_cols = view.visible_column_indices();
2961        let visible_rows = view.visible_row_indices();
2962
2963        // Build a set to track unique rows
2964        let mut seen_rows = HashSet::new();
2965        let mut unique_row_indices = Vec::new();
2966
2967        for &row_idx in visible_rows {
2968            // Build a key representing this row's visible column values
2969            let mut row_key = Vec::new();
2970            for &col_idx in visible_cols {
2971                let value = source
2972                    .get_value(row_idx, col_idx)
2973                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2974                // Convert value to a hashable representation
2975                row_key.push(format!("{:?}", value));
2976            }
2977
2978            // Check if we've seen this row before
2979            if seen_rows.insert(row_key) {
2980                // First time seeing this row combination
2981                unique_row_indices.push(row_idx);
2982            }
2983        }
2984
2985        // Create a new view with only unique rows
2986        Ok(view.with_rows(unique_row_indices))
2987    }
2988
2989    /// Apply multi-column ORDER BY sorting to the view
2990    fn apply_multi_order_by(
2991        &self,
2992        view: DataView,
2993        order_by_columns: &[OrderByItem],
2994    ) -> Result<DataView> {
2995        self.apply_multi_order_by_with_context(view, order_by_columns, None)
2996    }
2997
2998    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
2999    fn apply_multi_order_by_with_context(
3000        &self,
3001        mut view: DataView,
3002        order_by_columns: &[OrderByItem],
3003        _exec_context: Option<&ExecutionContext>,
3004    ) -> Result<DataView> {
3005        // Build list of (source_column_index, ascending) tuples
3006        let mut sort_columns = Vec::new();
3007
3008        for order_col in order_by_columns {
3009            // Extract column name from expression (currently only supports simple columns)
3010            let column_name = match &order_col.expr {
3011                SqlExpression::Column(col_ref) => col_ref.name.clone(),
3012                _ => {
3013                    // TODO: Support expression evaluation in ORDER BY
3014                    return Err(anyhow!(
3015                        "ORDER BY expressions not yet supported - only simple columns allowed"
3016                    ));
3017                }
3018            };
3019
3020            // Try to find the column index, handling qualified column names (table.column)
3021            let col_index = if column_name.contains('.') {
3022                // Qualified column name - extract unqualified part
3023                if let Some(dot_pos) = column_name.rfind('.') {
3024                    let col_name = &column_name[dot_pos + 1..];
3025
3026                    // After SELECT processing, columns are unqualified
3027                    // So just use the column name part
3028                    debug!(
3029                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
3030                        col_name, column_name
3031                    );
3032                    view.source().get_column_index(col_name)
3033                } else {
3034                    view.source().get_column_index(&column_name)
3035                }
3036            } else {
3037                // Simple column name
3038                view.source().get_column_index(&column_name)
3039            }
3040            .ok_or_else(|| {
3041                // If not found, provide helpful error with suggestions
3042                let suggestion = self.find_similar_column(view.source(), &column_name);
3043                match suggestion {
3044                    Some(similar) => anyhow::anyhow!(
3045                        "Column '{}' not found. Did you mean '{}'?",
3046                        column_name,
3047                        similar
3048                    ),
3049                    None => {
3050                        // Also list available columns for debugging
3051                        let available_cols = view.source().column_names().join(", ");
3052                        anyhow::anyhow!(
3053                            "Column '{}' not found. Available columns: {}",
3054                            column_name,
3055                            available_cols
3056                        )
3057                    }
3058                }
3059            })?;
3060
3061            let ascending = matches!(order_col.direction, SortDirection::Asc);
3062            sort_columns.push((col_index, ascending));
3063        }
3064
3065        // Apply multi-column sorting
3066        view.apply_multi_sort(&sort_columns)?;
3067        Ok(view)
3068    }
3069
3070    /// Apply GROUP BY to the view with optional HAVING clause
3071    fn apply_group_by(
3072        &self,
3073        view: DataView,
3074        group_by_exprs: &[SqlExpression],
3075        select_items: &[SelectItem],
3076        having: Option<&SqlExpression>,
3077        plan: &mut ExecutionPlanBuilder,
3078    ) -> Result<DataView> {
3079        // Use the new expression-based GROUP BY implementation
3080        let (result_view, phase_info) = self.apply_group_by_expressions(
3081            view,
3082            group_by_exprs,
3083            select_items,
3084            having,
3085            self.case_insensitive,
3086            self.date_notation.clone(),
3087        )?;
3088
3089        // Add detailed phase information to the execution plan
3090        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3091        plan.add_detail(format!(
3092            "Phase 1 - Group Building: {:.3}ms",
3093            phase_info.phase2_key_building.as_secs_f64() * 1000.0
3094        ));
3095        plan.add_detail(format!(
3096            "  • Processing {} rows into {} groups",
3097            phase_info.total_rows, phase_info.num_groups
3098        ));
3099        plan.add_detail(format!(
3100            "Phase 2 - Aggregation: {:.3}ms",
3101            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3102        ));
3103        if phase_info.phase4_having_evaluation > Duration::ZERO {
3104            plan.add_detail(format!(
3105                "Phase 3 - HAVING Filter: {:.3}ms",
3106                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3107            ));
3108            plan.add_detail(format!(
3109                "  • Filtered {} groups",
3110                phase_info.groups_filtered_by_having
3111            ));
3112        }
3113        plan.add_detail(format!(
3114            "Total GROUP BY time: {:.3}ms",
3115            phase_info.total_time.as_secs_f64() * 1000.0
3116        ));
3117
3118        Ok(result_view)
3119    }
3120
3121    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
3122    /// This helps pre-size hash tables for better performance
3123    pub fn estimate_group_cardinality(
3124        &self,
3125        view: &DataView,
3126        group_by_exprs: &[SqlExpression],
3127    ) -> usize {
3128        // If we have few rows, just return the row count as upper bound
3129        let row_count = view.get_visible_rows().len();
3130        if row_count <= 100 {
3131            return row_count;
3132        }
3133
3134        // Sample first 1000 rows or 10% of data, whichever is smaller
3135        let sample_size = min(1000, row_count / 10).max(100);
3136        let mut seen = FxHashSet::default();
3137
3138        let visible_rows = view.get_visible_rows();
3139        for (i, &row_idx) in visible_rows.iter().enumerate() {
3140            if i >= sample_size {
3141                break;
3142            }
3143
3144            // Evaluate GROUP BY expressions for this row
3145            let mut key_values = Vec::new();
3146            for expr in group_by_exprs {
3147                let mut evaluator = ArithmeticEvaluator::new(view.source());
3148                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3149                key_values.push(value);
3150            }
3151
3152            seen.insert(key_values);
3153        }
3154
3155        // Estimate total cardinality based on sample
3156        let sample_cardinality = seen.len();
3157        let estimated = (sample_cardinality * row_count) / sample_size;
3158
3159        // Cap at row count and ensure minimum of sample cardinality
3160        estimated.min(row_count).max(sample_cardinality)
3161    }
3162}
3163
3164#[cfg(test)]
3165mod tests {
3166    use super::*;
3167    use crate::data::datatable::{DataColumn, DataRow, DataValue};
3168
3169    fn create_test_table() -> Arc<DataTable> {
3170        let mut table = DataTable::new("test");
3171
3172        // Add columns
3173        table.add_column(DataColumn::new("id"));
3174        table.add_column(DataColumn::new("name"));
3175        table.add_column(DataColumn::new("age"));
3176
3177        // Add rows
3178        table
3179            .add_row(DataRow::new(vec![
3180                DataValue::Integer(1),
3181                DataValue::String("Alice".to_string()),
3182                DataValue::Integer(30),
3183            ]))
3184            .unwrap();
3185
3186        table
3187            .add_row(DataRow::new(vec![
3188                DataValue::Integer(2),
3189                DataValue::String("Bob".to_string()),
3190                DataValue::Integer(25),
3191            ]))
3192            .unwrap();
3193
3194        table
3195            .add_row(DataRow::new(vec![
3196                DataValue::Integer(3),
3197                DataValue::String("Charlie".to_string()),
3198                DataValue::Integer(35),
3199            ]))
3200            .unwrap();
3201
3202        Arc::new(table)
3203    }
3204
3205    #[test]
3206    fn test_select_all() {
3207        let table = create_test_table();
3208        let engine = QueryEngine::new();
3209
3210        let view = engine
3211            .execute(table.clone(), "SELECT * FROM users")
3212            .unwrap();
3213        assert_eq!(view.row_count(), 3);
3214        assert_eq!(view.column_count(), 3);
3215    }
3216
3217    #[test]
3218    fn test_select_columns() {
3219        let table = create_test_table();
3220        let engine = QueryEngine::new();
3221
3222        let view = engine
3223            .execute(table.clone(), "SELECT name, age FROM users")
3224            .unwrap();
3225        assert_eq!(view.row_count(), 3);
3226        assert_eq!(view.column_count(), 2);
3227    }
3228
3229    #[test]
3230    fn test_select_with_limit() {
3231        let table = create_test_table();
3232        let engine = QueryEngine::new();
3233
3234        let view = engine
3235            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3236            .unwrap();
3237        assert_eq!(view.row_count(), 2);
3238    }
3239
3240    #[test]
3241    fn test_type_coercion_contains() {
3242        // Initialize tracing for debug output
3243        let _ = tracing_subscriber::fmt()
3244            .with_max_level(tracing::Level::DEBUG)
3245            .try_init();
3246
3247        let mut table = DataTable::new("test");
3248        table.add_column(DataColumn::new("id"));
3249        table.add_column(DataColumn::new("status"));
3250        table.add_column(DataColumn::new("price"));
3251
3252        // Add test data with mixed types
3253        table
3254            .add_row(DataRow::new(vec![
3255                DataValue::Integer(1),
3256                DataValue::String("Pending".to_string()),
3257                DataValue::Float(99.99),
3258            ]))
3259            .unwrap();
3260
3261        table
3262            .add_row(DataRow::new(vec![
3263                DataValue::Integer(2),
3264                DataValue::String("Confirmed".to_string()),
3265                DataValue::Float(150.50),
3266            ]))
3267            .unwrap();
3268
3269        table
3270            .add_row(DataRow::new(vec![
3271                DataValue::Integer(3),
3272                DataValue::String("Pending".to_string()),
3273                DataValue::Float(75.00),
3274            ]))
3275            .unwrap();
3276
3277        let table = Arc::new(table);
3278        let engine = QueryEngine::new();
3279
3280        println!("\n=== Testing WHERE clause with Contains ===");
3281        println!("Table has {} rows", table.row_count());
3282        for i in 0..table.row_count() {
3283            let status = table.get_value(i, 1);
3284            println!("Row {i}: status = {status:?}");
3285        }
3286
3287        // Test 1: Basic string contains (should work)
3288        println!("\n--- Test 1: status.Contains('pend') ---");
3289        let result = engine.execute(
3290            table.clone(),
3291            "SELECT * FROM test WHERE status.Contains('pend')",
3292        );
3293        match result {
3294            Ok(view) => {
3295                println!("SUCCESS: Found {} matching rows", view.row_count());
3296                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3297            }
3298            Err(e) => {
3299                panic!("Query failed: {e}");
3300            }
3301        }
3302
3303        // Test 2: Numeric contains (should work with type coercion)
3304        println!("\n--- Test 2: price.Contains('9') ---");
3305        let result = engine.execute(
3306            table.clone(),
3307            "SELECT * FROM test WHERE price.Contains('9')",
3308        );
3309        match result {
3310            Ok(view) => {
3311                println!(
3312                    "SUCCESS: Found {} matching rows with price containing '9'",
3313                    view.row_count()
3314                );
3315                // Should find 99.99 row
3316                assert!(view.row_count() >= 1);
3317            }
3318            Err(e) => {
3319                panic!("Numeric coercion query failed: {e}");
3320            }
3321        }
3322
3323        println!("\n=== All tests passed! ===");
3324    }
3325
3326    #[test]
3327    fn test_not_in_clause() {
3328        // Initialize tracing for debug output
3329        let _ = tracing_subscriber::fmt()
3330            .with_max_level(tracing::Level::DEBUG)
3331            .try_init();
3332
3333        let mut table = DataTable::new("test");
3334        table.add_column(DataColumn::new("id"));
3335        table.add_column(DataColumn::new("country"));
3336
3337        // Add test data
3338        table
3339            .add_row(DataRow::new(vec![
3340                DataValue::Integer(1),
3341                DataValue::String("CA".to_string()),
3342            ]))
3343            .unwrap();
3344
3345        table
3346            .add_row(DataRow::new(vec![
3347                DataValue::Integer(2),
3348                DataValue::String("US".to_string()),
3349            ]))
3350            .unwrap();
3351
3352        table
3353            .add_row(DataRow::new(vec![
3354                DataValue::Integer(3),
3355                DataValue::String("UK".to_string()),
3356            ]))
3357            .unwrap();
3358
3359        let table = Arc::new(table);
3360        let engine = QueryEngine::new();
3361
3362        println!("\n=== Testing NOT IN clause ===");
3363        println!("Table has {} rows", table.row_count());
3364        for i in 0..table.row_count() {
3365            let country = table.get_value(i, 1);
3366            println!("Row {i}: country = {country:?}");
3367        }
3368
3369        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3370        println!("\n--- Test: country NOT IN ('CA') ---");
3371        let result = engine.execute(
3372            table.clone(),
3373            "SELECT * FROM test WHERE country NOT IN ('CA')",
3374        );
3375        match result {
3376            Ok(view) => {
3377                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3378                assert_eq!(view.row_count(), 2); // Should find US and UK
3379            }
3380            Err(e) => {
3381                panic!("NOT IN query failed: {e}");
3382            }
3383        }
3384
3385        println!("\n=== NOT IN test complete! ===");
3386    }
3387
3388    #[test]
3389    fn test_case_insensitive_in_and_not_in() {
3390        // Initialize tracing for debug output
3391        let _ = tracing_subscriber::fmt()
3392            .with_max_level(tracing::Level::DEBUG)
3393            .try_init();
3394
3395        let mut table = DataTable::new("test");
3396        table.add_column(DataColumn::new("id"));
3397        table.add_column(DataColumn::new("country"));
3398
3399        // Add test data with mixed case
3400        table
3401            .add_row(DataRow::new(vec![
3402                DataValue::Integer(1),
3403                DataValue::String("CA".to_string()), // uppercase
3404            ]))
3405            .unwrap();
3406
3407        table
3408            .add_row(DataRow::new(vec![
3409                DataValue::Integer(2),
3410                DataValue::String("us".to_string()), // lowercase
3411            ]))
3412            .unwrap();
3413
3414        table
3415            .add_row(DataRow::new(vec![
3416                DataValue::Integer(3),
3417                DataValue::String("UK".to_string()), // uppercase
3418            ]))
3419            .unwrap();
3420
3421        let table = Arc::new(table);
3422
3423        println!("\n=== Testing Case-Insensitive IN clause ===");
3424        println!("Table has {} rows", table.row_count());
3425        for i in 0..table.row_count() {
3426            let country = table.get_value(i, 1);
3427            println!("Row {i}: country = {country:?}");
3428        }
3429
3430        // Test case-insensitive IN - should match 'CA' with 'ca'
3431        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3432        let engine = QueryEngine::with_case_insensitive(true);
3433        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3434        match result {
3435            Ok(view) => {
3436                println!(
3437                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3438                    view.row_count()
3439                );
3440                assert_eq!(view.row_count(), 1); // Should find CA row
3441            }
3442            Err(e) => {
3443                panic!("Case-insensitive IN query failed: {e}");
3444            }
3445        }
3446
3447        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3448        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3449        let result = engine.execute(
3450            table.clone(),
3451            "SELECT * FROM test WHERE country NOT IN ('ca')",
3452        );
3453        match result {
3454            Ok(view) => {
3455                println!(
3456                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3457                    view.row_count()
3458                );
3459                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3460            }
3461            Err(e) => {
3462                panic!("Case-insensitive NOT IN query failed: {e}");
3463            }
3464        }
3465
3466        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3467        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3468        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3469        let result = engine_case_sensitive
3470            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3471        match result {
3472            Ok(view) => {
3473                println!(
3474                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3475                    view.row_count()
3476                );
3477                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3478            }
3479            Err(e) => {
3480                panic!("Case-sensitive IN query failed: {e}");
3481            }
3482        }
3483
3484        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3485    }
3486
3487    #[test]
3488    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3489    fn test_parentheses_in_where_clause() {
3490        // Initialize tracing for debug output
3491        let _ = tracing_subscriber::fmt()
3492            .with_max_level(tracing::Level::DEBUG)
3493            .try_init();
3494
3495        let mut table = DataTable::new("test");
3496        table.add_column(DataColumn::new("id"));
3497        table.add_column(DataColumn::new("status"));
3498        table.add_column(DataColumn::new("priority"));
3499
3500        // Add test data
3501        table
3502            .add_row(DataRow::new(vec![
3503                DataValue::Integer(1),
3504                DataValue::String("Pending".to_string()),
3505                DataValue::String("High".to_string()),
3506            ]))
3507            .unwrap();
3508
3509        table
3510            .add_row(DataRow::new(vec![
3511                DataValue::Integer(2),
3512                DataValue::String("Complete".to_string()),
3513                DataValue::String("High".to_string()),
3514            ]))
3515            .unwrap();
3516
3517        table
3518            .add_row(DataRow::new(vec![
3519                DataValue::Integer(3),
3520                DataValue::String("Pending".to_string()),
3521                DataValue::String("Low".to_string()),
3522            ]))
3523            .unwrap();
3524
3525        table
3526            .add_row(DataRow::new(vec![
3527                DataValue::Integer(4),
3528                DataValue::String("Complete".to_string()),
3529                DataValue::String("Low".to_string()),
3530            ]))
3531            .unwrap();
3532
3533        let table = Arc::new(table);
3534        let engine = QueryEngine::new();
3535
3536        println!("\n=== Testing Parentheses in WHERE clause ===");
3537        println!("Table has {} rows", table.row_count());
3538        for i in 0..table.row_count() {
3539            let status = table.get_value(i, 1);
3540            let priority = table.get_value(i, 2);
3541            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3542        }
3543
3544        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3545        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3546        let result = engine.execute(
3547            table.clone(),
3548            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3549        );
3550        match result {
3551            Ok(view) => {
3552                println!(
3553                    "SUCCESS: Found {} rows with parenthetical logic",
3554                    view.row_count()
3555                );
3556                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3557            }
3558            Err(e) => {
3559                panic!("Parentheses query failed: {e}");
3560            }
3561        }
3562
3563        println!("\n=== Parentheses test complete! ===");
3564    }
3565
3566    #[test]
3567    #[ignore = "Numeric type coercion needs fixing"]
3568    fn test_numeric_type_coercion() {
3569        // Initialize tracing for debug output
3570        let _ = tracing_subscriber::fmt()
3571            .with_max_level(tracing::Level::DEBUG)
3572            .try_init();
3573
3574        let mut table = DataTable::new("test");
3575        table.add_column(DataColumn::new("id"));
3576        table.add_column(DataColumn::new("price"));
3577        table.add_column(DataColumn::new("quantity"));
3578
3579        // Add test data with different numeric types
3580        table
3581            .add_row(DataRow::new(vec![
3582                DataValue::Integer(1),
3583                DataValue::Float(99.50), // Contains '.'
3584                DataValue::Integer(100),
3585            ]))
3586            .unwrap();
3587
3588        table
3589            .add_row(DataRow::new(vec![
3590                DataValue::Integer(2),
3591                DataValue::Float(150.0), // Contains '.' and '0'
3592                DataValue::Integer(200),
3593            ]))
3594            .unwrap();
3595
3596        table
3597            .add_row(DataRow::new(vec![
3598                DataValue::Integer(3),
3599                DataValue::Integer(75), // No decimal point
3600                DataValue::Integer(50),
3601            ]))
3602            .unwrap();
3603
3604        let table = Arc::new(table);
3605        let engine = QueryEngine::new();
3606
3607        println!("\n=== Testing Numeric Type Coercion ===");
3608        println!("Table has {} rows", table.row_count());
3609        for i in 0..table.row_count() {
3610            let price = table.get_value(i, 1);
3611            let quantity = table.get_value(i, 2);
3612            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3613        }
3614
3615        // Test Contains on float values - should find rows with decimal points
3616        println!("\n--- Test: price.Contains('.') ---");
3617        let result = engine.execute(
3618            table.clone(),
3619            "SELECT * FROM test WHERE price.Contains('.')",
3620        );
3621        match result {
3622            Ok(view) => {
3623                println!(
3624                    "SUCCESS: Found {} rows with decimal points in price",
3625                    view.row_count()
3626                );
3627                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3628            }
3629            Err(e) => {
3630                panic!("Numeric Contains query failed: {e}");
3631            }
3632        }
3633
3634        // Test Contains on integer values converted to string
3635        println!("\n--- Test: quantity.Contains('0') ---");
3636        let result = engine.execute(
3637            table.clone(),
3638            "SELECT * FROM test WHERE quantity.Contains('0')",
3639        );
3640        match result {
3641            Ok(view) => {
3642                println!(
3643                    "SUCCESS: Found {} rows with '0' in quantity",
3644                    view.row_count()
3645                );
3646                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3647            }
3648            Err(e) => {
3649                panic!("Integer Contains query failed: {e}");
3650            }
3651        }
3652
3653        println!("\n=== Numeric type coercion test complete! ===");
3654    }
3655
3656    #[test]
3657    fn test_datetime_comparisons() {
3658        // Initialize tracing for debug output
3659        let _ = tracing_subscriber::fmt()
3660            .with_max_level(tracing::Level::DEBUG)
3661            .try_init();
3662
3663        let mut table = DataTable::new("test");
3664        table.add_column(DataColumn::new("id"));
3665        table.add_column(DataColumn::new("created_date"));
3666
3667        // Add test data with date strings (as they would come from CSV)
3668        table
3669            .add_row(DataRow::new(vec![
3670                DataValue::Integer(1),
3671                DataValue::String("2024-12-15".to_string()),
3672            ]))
3673            .unwrap();
3674
3675        table
3676            .add_row(DataRow::new(vec![
3677                DataValue::Integer(2),
3678                DataValue::String("2025-01-15".to_string()),
3679            ]))
3680            .unwrap();
3681
3682        table
3683            .add_row(DataRow::new(vec![
3684                DataValue::Integer(3),
3685                DataValue::String("2025-02-15".to_string()),
3686            ]))
3687            .unwrap();
3688
3689        let table = Arc::new(table);
3690        let engine = QueryEngine::new();
3691
3692        println!("\n=== Testing DateTime Comparisons ===");
3693        println!("Table has {} rows", table.row_count());
3694        for i in 0..table.row_count() {
3695            let date = table.get_value(i, 1);
3696            println!("Row {i}: created_date = {date:?}");
3697        }
3698
3699        // Test DateTime constructor comparison - should find dates after 2025-01-01
3700        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3701        let result = engine.execute(
3702            table.clone(),
3703            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3704        );
3705        match result {
3706            Ok(view) => {
3707                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3708                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3709            }
3710            Err(e) => {
3711                panic!("DateTime comparison query failed: {e}");
3712            }
3713        }
3714
3715        println!("\n=== DateTime comparison test complete! ===");
3716    }
3717
3718    #[test]
3719    fn test_not_with_method_calls() {
3720        // Initialize tracing for debug output
3721        let _ = tracing_subscriber::fmt()
3722            .with_max_level(tracing::Level::DEBUG)
3723            .try_init();
3724
3725        let mut table = DataTable::new("test");
3726        table.add_column(DataColumn::new("id"));
3727        table.add_column(DataColumn::new("status"));
3728
3729        // Add test data
3730        table
3731            .add_row(DataRow::new(vec![
3732                DataValue::Integer(1),
3733                DataValue::String("Pending Review".to_string()),
3734            ]))
3735            .unwrap();
3736
3737        table
3738            .add_row(DataRow::new(vec![
3739                DataValue::Integer(2),
3740                DataValue::String("Complete".to_string()),
3741            ]))
3742            .unwrap();
3743
3744        table
3745            .add_row(DataRow::new(vec![
3746                DataValue::Integer(3),
3747                DataValue::String("Pending Approval".to_string()),
3748            ]))
3749            .unwrap();
3750
3751        let table = Arc::new(table);
3752        let engine = QueryEngine::with_case_insensitive(true);
3753
3754        println!("\n=== Testing NOT with Method Calls ===");
3755        println!("Table has {} rows", table.row_count());
3756        for i in 0..table.row_count() {
3757            let status = table.get_value(i, 1);
3758            println!("Row {i}: status = {status:?}");
3759        }
3760
3761        // Test NOT with Contains - should exclude rows containing "pend"
3762        println!("\n--- Test: NOT status.Contains('pend') ---");
3763        let result = engine.execute(
3764            table.clone(),
3765            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3766        );
3767        match result {
3768            Ok(view) => {
3769                println!(
3770                    "SUCCESS: Found {} rows NOT containing 'pend'",
3771                    view.row_count()
3772                );
3773                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3774            }
3775            Err(e) => {
3776                panic!("NOT Contains query failed: {e}");
3777            }
3778        }
3779
3780        // Test NOT with StartsWith
3781        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3782        let result = engine.execute(
3783            table.clone(),
3784            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3785        );
3786        match result {
3787            Ok(view) => {
3788                println!(
3789                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3790                    view.row_count()
3791                );
3792                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3793            }
3794            Err(e) => {
3795                panic!("NOT StartsWith query failed: {e}");
3796            }
3797        }
3798
3799        println!("\n=== NOT with method calls test complete! ===");
3800    }
3801
3802    #[test]
3803    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3804    fn test_complex_logical_expressions() {
3805        // Initialize tracing for debug output
3806        let _ = tracing_subscriber::fmt()
3807            .with_max_level(tracing::Level::DEBUG)
3808            .try_init();
3809
3810        let mut table = DataTable::new("test");
3811        table.add_column(DataColumn::new("id"));
3812        table.add_column(DataColumn::new("status"));
3813        table.add_column(DataColumn::new("priority"));
3814        table.add_column(DataColumn::new("assigned"));
3815
3816        // Add comprehensive test data
3817        table
3818            .add_row(DataRow::new(vec![
3819                DataValue::Integer(1),
3820                DataValue::String("Pending".to_string()),
3821                DataValue::String("High".to_string()),
3822                DataValue::String("John".to_string()),
3823            ]))
3824            .unwrap();
3825
3826        table
3827            .add_row(DataRow::new(vec![
3828                DataValue::Integer(2),
3829                DataValue::String("Complete".to_string()),
3830                DataValue::String("High".to_string()),
3831                DataValue::String("Jane".to_string()),
3832            ]))
3833            .unwrap();
3834
3835        table
3836            .add_row(DataRow::new(vec![
3837                DataValue::Integer(3),
3838                DataValue::String("Pending".to_string()),
3839                DataValue::String("Low".to_string()),
3840                DataValue::String("John".to_string()),
3841            ]))
3842            .unwrap();
3843
3844        table
3845            .add_row(DataRow::new(vec![
3846                DataValue::Integer(4),
3847                DataValue::String("In Progress".to_string()),
3848                DataValue::String("Medium".to_string()),
3849                DataValue::String("Jane".to_string()),
3850            ]))
3851            .unwrap();
3852
3853        let table = Arc::new(table);
3854        let engine = QueryEngine::new();
3855
3856        println!("\n=== Testing Complex Logical Expressions ===");
3857        println!("Table has {} rows", table.row_count());
3858        for i in 0..table.row_count() {
3859            let status = table.get_value(i, 1);
3860            let priority = table.get_value(i, 2);
3861            let assigned = table.get_value(i, 3);
3862            println!(
3863                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3864            );
3865        }
3866
3867        // Test complex AND/OR logic
3868        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3869        let result = engine.execute(
3870            table.clone(),
3871            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3872        );
3873        match result {
3874            Ok(view) => {
3875                println!(
3876                    "SUCCESS: Found {} rows with complex logic",
3877                    view.row_count()
3878                );
3879                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3880            }
3881            Err(e) => {
3882                panic!("Complex logic query failed: {e}");
3883            }
3884        }
3885
3886        // Test NOT with complex expressions
3887        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3888        let result = engine.execute(
3889            table.clone(),
3890            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3891        );
3892        match result {
3893            Ok(view) => {
3894                println!(
3895                    "SUCCESS: Found {} rows with NOT complex logic",
3896                    view.row_count()
3897                );
3898                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3899            }
3900            Err(e) => {
3901                panic!("NOT complex logic query failed: {e}");
3902            }
3903        }
3904
3905        println!("\n=== Complex logical expressions test complete! ===");
3906    }
3907
3908    #[test]
3909    fn test_mixed_data_types_and_edge_cases() {
3910        // Initialize tracing for debug output
3911        let _ = tracing_subscriber::fmt()
3912            .with_max_level(tracing::Level::DEBUG)
3913            .try_init();
3914
3915        let mut table = DataTable::new("test");
3916        table.add_column(DataColumn::new("id"));
3917        table.add_column(DataColumn::new("value"));
3918        table.add_column(DataColumn::new("nullable_field"));
3919
3920        // Add test data with mixed types and edge cases
3921        table
3922            .add_row(DataRow::new(vec![
3923                DataValue::Integer(1),
3924                DataValue::String("123.45".to_string()),
3925                DataValue::String("present".to_string()),
3926            ]))
3927            .unwrap();
3928
3929        table
3930            .add_row(DataRow::new(vec![
3931                DataValue::Integer(2),
3932                DataValue::Float(678.90),
3933                DataValue::Null,
3934            ]))
3935            .unwrap();
3936
3937        table
3938            .add_row(DataRow::new(vec![
3939                DataValue::Integer(3),
3940                DataValue::Boolean(true),
3941                DataValue::String("also present".to_string()),
3942            ]))
3943            .unwrap();
3944
3945        table
3946            .add_row(DataRow::new(vec![
3947                DataValue::Integer(4),
3948                DataValue::String("false".to_string()),
3949                DataValue::Null,
3950            ]))
3951            .unwrap();
3952
3953        let table = Arc::new(table);
3954        let engine = QueryEngine::new();
3955
3956        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3957        println!("Table has {} rows", table.row_count());
3958        for i in 0..table.row_count() {
3959            let value = table.get_value(i, 1);
3960            let nullable = table.get_value(i, 2);
3961            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3962        }
3963
3964        // Test type coercion with boolean Contains
3965        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3966        let result = engine.execute(
3967            table.clone(),
3968            "SELECT * FROM test WHERE value.Contains('true')",
3969        );
3970        match result {
3971            Ok(view) => {
3972                println!(
3973                    "SUCCESS: Found {} rows with boolean coercion",
3974                    view.row_count()
3975                );
3976                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3977            }
3978            Err(e) => {
3979                panic!("Boolean coercion query failed: {e}");
3980            }
3981        }
3982
3983        // Test multiple IN values with mixed types
3984        println!("\n--- Test: id IN (1, 3) ---");
3985        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3986        match result {
3987            Ok(view) => {
3988                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3989                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
3990            }
3991            Err(e) => {
3992                panic!("Multiple IN values query failed: {e}");
3993            }
3994        }
3995
3996        println!("\n=== Mixed data types test complete! ===");
3997    }
3998
3999    /// Test that aggregate-only queries return exactly one row (regression test)
4000    #[test]
4001    fn test_aggregate_only_single_row() {
4002        let table = create_test_stock_data();
4003        let engine = QueryEngine::new();
4004
4005        // Test query with multiple aggregates - should return exactly 1 row
4006        let result = engine
4007            .execute(
4008                table.clone(),
4009                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
4010            )
4011            .expect("Query should succeed");
4012
4013        assert_eq!(
4014            result.row_count(),
4015            1,
4016            "Aggregate-only query should return exactly 1 row"
4017        );
4018        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
4019
4020        // Verify the actual values are correct
4021        let source = result.source();
4022        let row = source.get_row(0).expect("Should have first row");
4023
4024        // COUNT(*) should be 5 (total rows)
4025        assert_eq!(row.values[0], DataValue::Integer(5));
4026
4027        // MIN should be 99.5
4028        assert_eq!(row.values[1], DataValue::Float(99.5));
4029
4030        // MAX should be 105.0
4031        assert_eq!(row.values[2], DataValue::Float(105.0));
4032
4033        // AVG should be approximately 102.4
4034        if let DataValue::Float(avg) = &row.values[3] {
4035            assert!(
4036                (avg - 102.4).abs() < 0.01,
4037                "Average should be approximately 102.4, got {}",
4038                avg
4039            );
4040        } else {
4041            panic!("AVG should return a Float value");
4042        }
4043    }
4044
4045    /// Test single aggregate function returns single row
4046    #[test]
4047    fn test_single_aggregate_single_row() {
4048        let table = create_test_stock_data();
4049        let engine = QueryEngine::new();
4050
4051        let result = engine
4052            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4053            .expect("Query should succeed");
4054
4055        assert_eq!(
4056            result.row_count(),
4057            1,
4058            "Single aggregate query should return exactly 1 row"
4059        );
4060        assert_eq!(result.column_count(), 1, "Should have 1 column");
4061
4062        let source = result.source();
4063        let row = source.get_row(0).expect("Should have first row");
4064        assert_eq!(row.values[0], DataValue::Integer(5));
4065    }
4066
4067    /// Test aggregate with WHERE clause filtering
4068    #[test]
4069    fn test_aggregate_with_where_single_row() {
4070        let table = create_test_stock_data();
4071        let engine = QueryEngine::new();
4072
4073        // Filter to only high-value stocks (>= 103.0) and aggregate
4074        let result = engine
4075            .execute(
4076                table.clone(),
4077                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4078            )
4079            .expect("Query should succeed");
4080
4081        assert_eq!(
4082            result.row_count(),
4083            1,
4084            "Filtered aggregate query should return exactly 1 row"
4085        );
4086        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4087
4088        let source = result.source();
4089        let row = source.get_row(0).expect("Should have first row");
4090
4091        // Should find 2 rows (103.5 and 105.0)
4092        assert_eq!(row.values[0], DataValue::Integer(2));
4093        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
4094        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
4095    }
4096
4097    #[test]
4098    fn test_not_in_parsing() {
4099        use crate::sql::recursive_parser::Parser;
4100
4101        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4102        println!("\n=== Testing NOT IN parsing ===");
4103        println!("Parsing query: {query}");
4104
4105        let mut parser = Parser::new(query);
4106        match parser.parse() {
4107            Ok(statement) => {
4108                println!("Parsed statement: {statement:#?}");
4109                if let Some(where_clause) = statement.where_clause {
4110                    println!("WHERE conditions: {:#?}", where_clause.conditions);
4111                    if let Some(first_condition) = where_clause.conditions.first() {
4112                        println!("First condition expression: {:#?}", first_condition.expr);
4113                    }
4114                }
4115            }
4116            Err(e) => {
4117                panic!("Parse error: {e}");
4118            }
4119        }
4120    }
4121
4122    /// Create test stock data for aggregate testing
4123    fn create_test_stock_data() -> Arc<DataTable> {
4124        let mut table = DataTable::new("stock");
4125
4126        table.add_column(DataColumn::new("symbol"));
4127        table.add_column(DataColumn::new("close"));
4128        table.add_column(DataColumn::new("volume"));
4129
4130        // Add 5 rows of test data
4131        let test_data = vec![
4132            ("AAPL", 99.5, 1000),
4133            ("AAPL", 101.2, 1500),
4134            ("AAPL", 103.5, 2000),
4135            ("AAPL", 105.0, 1200),
4136            ("AAPL", 102.8, 1800),
4137        ];
4138
4139        for (symbol, close, volume) in test_data {
4140            table
4141                .add_row(DataRow::new(vec![
4142                    DataValue::String(symbol.to_string()),
4143                    DataValue::Float(close),
4144                    DataValue::Integer(volume),
4145                ]))
4146                .expect("Should add row successfully");
4147        }
4148
4149        Arc::new(table)
4150    }
4151}
4152
4153#[cfg(test)]
4154#[path = "query_engine_tests.rs"]
4155mod query_engine_tests;