Skip to main content

sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28    CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29    TableFunction,
30};
31
32/// Execution context for tracking table aliases and scope during query execution
33#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35    /// Map from alias to actual table/CTE name
36    /// Example: "t" -> "#tmp_trades", "a" -> "data"
37    alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41    /// Create a new empty execution context
42    pub fn new() -> Self {
43        Self {
44            alias_map: HashMap::new(),
45        }
46    }
47
48    /// Register a table alias
49    pub fn register_alias(&mut self, alias: String, table_name: String) {
50        debug!("Registering alias: {} -> {}", alias, table_name);
51        self.alias_map.insert(alias, table_name);
52    }
53
54    /// Resolve an alias to its actual table name
55    /// Returns the alias itself if not found in the map
56    pub fn resolve_alias(&self, name: &str) -> String {
57        self.alias_map
58            .get(name)
59            .cloned()
60            .unwrap_or_else(|| name.to_string())
61    }
62
63    /// Check if a name is a registered alias
64    pub fn is_alias(&self, name: &str) -> bool {
65        self.alias_map.contains_key(name)
66    }
67
68    /// Get a copy of all registered aliases
69    pub fn get_aliases(&self) -> HashMap<String, String> {
70        self.alias_map.clone()
71    }
72
73    /// Resolve a column reference to its index in the table, handling aliases
74    ///
75    /// This is the unified column resolution function that should be used by all
76    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
77    /// alias resolution behavior.
78    ///
79    /// Resolution strategy:
80    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
81    ///    a. Resolve the alias: t -> actual_table_name
82    ///    b. Try qualified lookup: "actual_table_name.amount"
83    ///    c. Fall back to unqualified: "amount"
84    /// 2. If column_ref has no prefix:
85    ///    a. Try simple column name lookup: "amount"
86    ///    b. Try as qualified name if it contains a dot: "table.column"
87    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88        if let Some(table_prefix) = &column_ref.table_prefix {
89            // Qualified column reference: resolve the alias first
90            let actual_table = self.resolve_alias(table_prefix);
91
92            // Try qualified lookup: "actual_table.column"
93            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95                debug!(
96                    "Resolved {}.{} -> qualified column '{}' at index {}",
97                    table_prefix, column_ref.name, qualified_name, idx
98                );
99                return Ok(idx);
100            }
101
102            // Fall back to unqualified lookup
103            if let Some(idx) = table.get_column_index(&column_ref.name) {
104                debug!(
105                    "Resolved {}.{} -> unqualified column '{}' at index {}",
106                    table_prefix, column_ref.name, column_ref.name, idx
107                );
108                return Ok(idx);
109            }
110
111            // Not found with either qualified or unqualified name
112            Err(anyhow!(
113                "Column '{}' not found. Table '{}' may not support qualified column names",
114                qualified_name,
115                actual_table
116            ))
117        } else {
118            // Unqualified column reference
119            if let Some(idx) = table.get_column_index(&column_ref.name) {
120                debug!(
121                    "Resolved unqualified column '{}' at index {}",
122                    column_ref.name, idx
123                );
124                return Ok(idx);
125            }
126
127            // If the column name contains a dot, try it as a qualified name
128            if column_ref.name.contains('.') {
129                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130                    debug!(
131                        "Resolved '{}' as qualified column at index {}",
132                        column_ref.name, idx
133                    );
134                    return Ok(idx);
135                }
136            }
137
138            // Column not found - provide helpful error
139            let suggestion = self.find_similar_column(table, &column_ref.name);
140            match suggestion {
141                Some(similar) => Err(anyhow!(
142                    "Column '{}' not found. Did you mean '{}'?",
143                    column_ref.name,
144                    similar
145                )),
146                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147            }
148        }
149    }
150
151    /// Find a similar column name using edit distance (for better error messages)
152    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153        let columns = table.column_names();
154        let mut best_match: Option<(String, usize)> = None;
155
156        for col in columns {
157            let distance = edit_distance(name, &col);
158            if distance <= 2 {
159                // Allow up to 2 character differences
160                match best_match {
161                    Some((_, best_dist)) if distance < best_dist => {
162                        best_match = Some((col.clone(), distance));
163                    }
164                    None => {
165                        best_match = Some((col.clone(), distance));
166                    }
167                    _ => {}
168                }
169            }
170        }
171
172        best_match.map(|(name, _)| name)
173    }
174}
175
176impl Default for ExecutionContext {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Calculate edit distance between two strings (Levenshtein distance)
183fn edit_distance(a: &str, b: &str) -> usize {
184    let len_a = a.chars().count();
185    let len_b = b.chars().count();
186
187    if len_a == 0 {
188        return len_b;
189    }
190    if len_b == 0 {
191        return len_a;
192    }
193
194    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196    for i in 0..=len_a {
197        matrix[i][0] = i;
198    }
199    for j in 0..=len_b {
200        matrix[0][j] = j;
201    }
202
203    let a_chars: Vec<char> = a.chars().collect();
204    let b_chars: Vec<char> = b.chars().collect();
205
206    for i in 1..=len_a {
207        for j in 1..=len_b {
208            let cost = if a_chars[i - 1] == b_chars[j - 1] {
209                0
210            } else {
211                1
212            };
213            matrix[i][j] = min(
214                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215                matrix[i - 1][j - 1] + cost,
216            );
217        }
218    }
219
220    matrix[len_a][len_b]
221}
222
223/// Query engine that executes SQL directly on `DataTable`
224#[derive(Clone)]
225pub struct QueryEngine {
226    case_insensitive: bool,
227    date_notation: String,
228    _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl QueryEngine {
238    #[must_use]
239    pub fn new() -> Self {
240        Self {
241            case_insensitive: false,
242            date_notation: get_date_notation(),
243            _behavior_config: None,
244        }
245    }
246
247    #[must_use]
248    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249        let case_insensitive = config.case_insensitive_default;
250        // Use get_date_notation() to respect environment variable override
251        let date_notation = get_date_notation();
252        Self {
253            case_insensitive,
254            date_notation,
255            _behavior_config: Some(config),
256        }
257    }
258
259    #[must_use]
260    pub fn with_date_notation(_date_notation: String) -> Self {
261        Self {
262            case_insensitive: false,
263            date_notation: get_date_notation(), // Always use the global function
264            _behavior_config: None,
265        }
266    }
267
268    #[must_use]
269    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270        Self {
271            case_insensitive,
272            date_notation: get_date_notation(),
273            _behavior_config: None,
274        }
275    }
276
277    #[must_use]
278    pub fn with_case_insensitive_and_date_notation(
279        case_insensitive: bool,
280        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
281    ) -> Self {
282        Self {
283            case_insensitive,
284            date_notation: get_date_notation(), // Always use the global function
285            _behavior_config: None,
286        }
287    }
288
289    /// Find a column name similar to the given name using edit distance
290    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291        let columns = table.column_names();
292        let mut best_match: Option<(String, usize)> = None;
293
294        for col in columns {
295            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296            // Only suggest if distance is small (likely a typo)
297            // Allow up to 3 edits for longer names
298            let max_distance = if name.len() > 10 { 3 } else { 2 };
299            if distance <= max_distance {
300                match &best_match {
301                    None => best_match = Some((col, distance)),
302                    Some((_, best_dist)) if distance < *best_dist => {
303                        best_match = Some((col, distance));
304                    }
305                    _ => {}
306                }
307            }
308        }
309
310        best_match.map(|(name, _)| name)
311    }
312
313    /// Calculate Levenshtein edit distance between two strings
314    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315        let len1 = s1.len();
316        let len2 = s2.len();
317        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319        for i in 0..=len1 {
320            matrix[i][0] = i;
321        }
322        for j in 0..=len2 {
323            matrix[0][j] = j;
324        }
325
326        for (i, c1) in s1.chars().enumerate() {
327            for (j, c2) in s2.chars().enumerate() {
328                let cost = usize::from(c1 != c2);
329                matrix[i + 1][j + 1] = std::cmp::min(
330                    matrix[i][j + 1] + 1, // deletion
331                    std::cmp::min(
332                        matrix[i + 1][j] + 1, // insertion
333                        matrix[i][j] + cost,  // substitution
334                    ),
335                );
336            }
337        }
338
339        matrix[len1][len2]
340    }
341
342    /// Check if an expression contains UNNEST function call
343    fn contains_unnest(expr: &SqlExpression) -> bool {
344        match expr {
345            // Direct UNNEST variant
346            SqlExpression::Unnest { .. } => true,
347            SqlExpression::FunctionCall { name, args, .. } => {
348                if name.to_uppercase() == "UNNEST" {
349                    return true;
350                }
351                // Check recursively in function arguments
352                args.iter().any(Self::contains_unnest)
353            }
354            SqlExpression::BinaryOp { left, right, .. } => {
355                Self::contains_unnest(left) || Self::contains_unnest(right)
356            }
357            SqlExpression::Not { expr } => Self::contains_unnest(expr),
358            SqlExpression::CaseExpression {
359                when_branches,
360                else_branch,
361            } => {
362                when_branches.iter().any(|branch| {
363                    Self::contains_unnest(&branch.condition)
364                        || Self::contains_unnest(&branch.result)
365                }) || else_branch
366                    .as_ref()
367                    .map_or(false, |e| Self::contains_unnest(e))
368            }
369            SqlExpression::SimpleCaseExpression {
370                expr,
371                when_branches,
372                else_branch,
373            } => {
374                Self::contains_unnest(expr)
375                    || when_branches.iter().any(|branch| {
376                        Self::contains_unnest(&branch.value)
377                            || Self::contains_unnest(&branch.result)
378                    })
379                    || else_branch
380                        .as_ref()
381                        .map_or(false, |e| Self::contains_unnest(e))
382            }
383            SqlExpression::InList { expr, values } => {
384                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385            }
386            SqlExpression::NotInList { expr, values } => {
387                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388            }
389            SqlExpression::Between { expr, lower, upper } => {
390                Self::contains_unnest(expr)
391                    || Self::contains_unnest(lower)
392                    || Self::contains_unnest(upper)
393            }
394            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
397            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399            SqlExpression::ChainedMethodCall { base, args, .. } => {
400                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401            }
402            _ => false,
403        }
404    }
405
406    /// Collect all WindowSpecs from an expression (helper for pre-creating contexts)
407    fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408        match expr {
409            SqlExpression::WindowFunction {
410                window_spec, args, ..
411            } => {
412                // Add this window spec
413                specs.push(window_spec.clone());
414                // Recursively check arguments
415                for arg in args {
416                    Self::collect_window_specs(arg, specs);
417                }
418            }
419            SqlExpression::BinaryOp { left, right, .. } => {
420                Self::collect_window_specs(left, specs);
421                Self::collect_window_specs(right, specs);
422            }
423            SqlExpression::Not { expr } => {
424                Self::collect_window_specs(expr, specs);
425            }
426            SqlExpression::FunctionCall { args, .. } => {
427                for arg in args {
428                    Self::collect_window_specs(arg, specs);
429                }
430            }
431            SqlExpression::CaseExpression {
432                when_branches,
433                else_branch,
434            } => {
435                for branch in when_branches {
436                    Self::collect_window_specs(&branch.condition, specs);
437                    Self::collect_window_specs(&branch.result, specs);
438                }
439                if let Some(else_expr) = else_branch {
440                    Self::collect_window_specs(else_expr, specs);
441                }
442            }
443            SqlExpression::SimpleCaseExpression {
444                expr,
445                when_branches,
446                else_branch,
447            } => {
448                Self::collect_window_specs(expr, specs);
449                for branch in when_branches {
450                    Self::collect_window_specs(&branch.value, specs);
451                    Self::collect_window_specs(&branch.result, specs);
452                }
453                if let Some(else_expr) = else_branch {
454                    Self::collect_window_specs(else_expr, specs);
455                }
456            }
457            SqlExpression::InList { expr, values, .. } => {
458                Self::collect_window_specs(expr, specs);
459                for item in values {
460                    Self::collect_window_specs(item, specs);
461                }
462            }
463            SqlExpression::ChainedMethodCall { base, args, .. } => {
464                Self::collect_window_specs(base, specs);
465                for arg in args {
466                    Self::collect_window_specs(arg, specs);
467                }
468            }
469            // Leaf nodes - no recursion needed
470            SqlExpression::Column(_)
471            | SqlExpression::NumberLiteral(_)
472            | SqlExpression::StringLiteral(_)
473            | SqlExpression::BooleanLiteral(_)
474            | SqlExpression::Null
475            | SqlExpression::DateTimeToday { .. }
476            | SqlExpression::DateTimeConstructor { .. }
477            | SqlExpression::MethodCall { .. } => {}
478            // Catch-all for any other variants
479            _ => {}
480        }
481    }
482
483    /// Check if an expression contains a window function
484    fn contains_window_function(expr: &SqlExpression) -> bool {
485        match expr {
486            SqlExpression::WindowFunction { .. } => true,
487            SqlExpression::BinaryOp { left, right, .. } => {
488                Self::contains_window_function(left) || Self::contains_window_function(right)
489            }
490            SqlExpression::Not { expr } => Self::contains_window_function(expr),
491            SqlExpression::FunctionCall { args, .. } => {
492                args.iter().any(Self::contains_window_function)
493            }
494            SqlExpression::CaseExpression {
495                when_branches,
496                else_branch,
497            } => {
498                when_branches.iter().any(|branch| {
499                    Self::contains_window_function(&branch.condition)
500                        || Self::contains_window_function(&branch.result)
501                }) || else_branch
502                    .as_ref()
503                    .map_or(false, |e| Self::contains_window_function(e))
504            }
505            SqlExpression::SimpleCaseExpression {
506                expr,
507                when_branches,
508                else_branch,
509            } => {
510                Self::contains_window_function(expr)
511                    || when_branches.iter().any(|branch| {
512                        Self::contains_window_function(&branch.value)
513                            || Self::contains_window_function(&branch.result)
514                    })
515                    || else_branch
516                        .as_ref()
517                        .map_or(false, |e| Self::contains_window_function(e))
518            }
519            SqlExpression::InList { expr, values } => {
520                Self::contains_window_function(expr)
521                    || values.iter().any(Self::contains_window_function)
522            }
523            SqlExpression::NotInList { expr, values } => {
524                Self::contains_window_function(expr)
525                    || values.iter().any(Self::contains_window_function)
526            }
527            SqlExpression::Between { expr, lower, upper } => {
528                Self::contains_window_function(expr)
529                    || Self::contains_window_function(lower)
530                    || Self::contains_window_function(upper)
531            }
532            SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533            SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534            SqlExpression::MethodCall { args, .. } => {
535                args.iter().any(Self::contains_window_function)
536            }
537            SqlExpression::ChainedMethodCall { base, args, .. } => {
538                Self::contains_window_function(base)
539                    || args.iter().any(Self::contains_window_function)
540            }
541            _ => false,
542        }
543    }
544
545    /// Extract all window function specifications from select items
546    fn extract_window_specs(
547        items: &[SelectItem],
548    ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549        let mut specs = Vec::new();
550        for (idx, item) in items.iter().enumerate() {
551            if let SelectItem::Expression { expr, .. } = item {
552                Self::collect_window_function_specs(expr, idx, &mut specs);
553            }
554        }
555        specs
556    }
557
558    /// Recursively collect window function specs from an expression
559    fn collect_window_function_specs(
560        expr: &SqlExpression,
561        output_column_index: usize,
562        specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563    ) {
564        match expr {
565            SqlExpression::WindowFunction {
566                name,
567                args,
568                window_spec,
569            } => {
570                specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571                    spec: window_spec.clone(),
572                    function_name: name.clone(),
573                    args: args.clone(),
574                    output_column_index,
575                });
576            }
577            SqlExpression::BinaryOp { left, right, .. } => {
578                Self::collect_window_function_specs(left, output_column_index, specs);
579                Self::collect_window_function_specs(right, output_column_index, specs);
580            }
581            SqlExpression::Not { expr } => {
582                Self::collect_window_function_specs(expr, output_column_index, specs);
583            }
584            SqlExpression::FunctionCall { args, .. } => {
585                for arg in args {
586                    Self::collect_window_function_specs(arg, output_column_index, specs);
587                }
588            }
589            SqlExpression::CaseExpression {
590                when_branches,
591                else_branch,
592            } => {
593                for branch in when_branches {
594                    Self::collect_window_function_specs(
595                        &branch.condition,
596                        output_column_index,
597                        specs,
598                    );
599                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600                }
601                if let Some(e) = else_branch {
602                    Self::collect_window_function_specs(e, output_column_index, specs);
603                }
604            }
605            SqlExpression::SimpleCaseExpression {
606                expr,
607                when_branches,
608                else_branch,
609            } => {
610                Self::collect_window_function_specs(expr, output_column_index, specs);
611                for branch in when_branches {
612                    Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613                    Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614                }
615                if let Some(e) = else_branch {
616                    Self::collect_window_function_specs(e, output_column_index, specs);
617                }
618            }
619            SqlExpression::InList { expr, values } => {
620                Self::collect_window_function_specs(expr, output_column_index, specs);
621                for val in values {
622                    Self::collect_window_function_specs(val, output_column_index, specs);
623                }
624            }
625            SqlExpression::NotInList { expr, values } => {
626                Self::collect_window_function_specs(expr, output_column_index, specs);
627                for val in values {
628                    Self::collect_window_function_specs(val, output_column_index, specs);
629                }
630            }
631            SqlExpression::Between { expr, lower, upper } => {
632                Self::collect_window_function_specs(expr, output_column_index, specs);
633                Self::collect_window_function_specs(lower, output_column_index, specs);
634                Self::collect_window_function_specs(upper, output_column_index, specs);
635            }
636            SqlExpression::InSubquery { expr, .. } => {
637                Self::collect_window_function_specs(expr, output_column_index, specs);
638            }
639            SqlExpression::NotInSubquery { expr, .. } => {
640                Self::collect_window_function_specs(expr, output_column_index, specs);
641            }
642            SqlExpression::MethodCall { args, .. } => {
643                for arg in args {
644                    Self::collect_window_function_specs(arg, output_column_index, specs);
645                }
646            }
647            SqlExpression::ChainedMethodCall { base, args, .. } => {
648                Self::collect_window_function_specs(base, output_column_index, specs);
649                for arg in args {
650                    Self::collect_window_function_specs(arg, output_column_index, specs);
651                }
652            }
653            _ => {} // Other expression types don't contain window functions
654        }
655    }
656
657    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
658    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659        let (view, _plan) = self.execute_with_plan(table, sql)?;
660        Ok(view)
661    }
662
663    /// Execute a SQL query with optional temp table registry access
664    pub fn execute_with_temp_tables(
665        &self,
666        table: Arc<DataTable>,
667        sql: &str,
668        temp_tables: Option<&TempTableRegistry>,
669    ) -> Result<DataView> {
670        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671        Ok(view)
672    }
673
674    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
675    pub fn execute_statement(
676        &self,
677        table: Arc<DataTable>,
678        statement: SelectStatement,
679    ) -> Result<DataView> {
680        self.execute_statement_with_temp_tables(table, statement, None)
681    }
682
683    /// Execute a parsed SelectStatement with optional temp table access
684    pub fn execute_statement_with_temp_tables(
685        &self,
686        table: Arc<DataTable>,
687        statement: SelectStatement,
688        temp_tables: Option<&TempTableRegistry>,
689    ) -> Result<DataView> {
690        // First process CTEs to build context
691        let mut cte_context = HashMap::new();
692
693        // Add temp tables to CTE context if provided
694        if let Some(temp_registry) = temp_tables {
695            for table_name in temp_registry.list_tables() {
696                if let Some(temp_table) = temp_registry.get(&table_name) {
697                    debug!("Adding temp table {} to CTE context", table_name);
698                    let view = DataView::new(temp_table);
699                    cte_context.insert(table_name, Arc::new(view));
700                }
701            }
702        }
703
704        for cte in &statement.ctes {
705            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706            // Execute the CTE based on its type
707            let cte_result = match &cte.cte_type {
708                CTEType::Standard(query) => {
709                    // Execute the CTE query (it might reference earlier CTEs)
710                    let view = self.build_view_with_context(
711                        table.clone(),
712                        query.clone(),
713                        &mut cte_context,
714                    )?;
715
716                    // Materialize the view and enrich columns with qualified names
717                    let mut materialized = self.materialize_view(view)?;
718
719                    // Enrich columns with qualified names for proper scoping
720                    for column in materialized.columns_mut() {
721                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722                        column.source_table = Some(cte.name.clone());
723                    }
724
725                    DataView::new(Arc::new(materialized))
726                }
727                CTEType::Web(web_spec) => {
728                    // Fetch data from URL
729                    use crate::web::http_fetcher::WebDataFetcher;
730
731                    let fetcher = WebDataFetcher::new()?;
732                    // Pass None for query context (no full SQL available in these contexts)
733                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735                    // Enrich columns with qualified names for proper scoping
736                    for column in data_table.columns_mut() {
737                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738                        column.source_table = Some(cte.name.clone());
739                    }
740
741                    // Convert DataTable to DataView
742                    DataView::new(Arc::new(data_table))
743                }
744                CTEType::File(file_spec) => {
745                    let mut data_table =
746                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
747
748                    for column in data_table.columns_mut() {
749                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
750                        column.source_table = Some(cte.name.clone());
751                    }
752
753                    DataView::new(Arc::new(data_table))
754                }
755            };
756            // Store the result in the context for later use
757            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
758            debug!(
759                "QueryEngine: CTE '{}' pre-processed, stored in context",
760                cte.name
761            );
762        }
763
764        // Now process subqueries with CTE context available
765        let mut subquery_executor =
766            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
767        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
768
769        // Build the view with the same CTE context
770        self.build_view_with_context(table, processed_statement, &mut cte_context)
771    }
772
773    /// Execute a statement with provided CTE context (for subqueries)
774    pub fn execute_statement_with_cte_context(
775        &self,
776        table: Arc<DataTable>,
777        statement: SelectStatement,
778        cte_context: &HashMap<String, Arc<DataView>>,
779    ) -> Result<DataView> {
780        // Clone the context so we can add any CTEs from this statement
781        let mut local_context = cte_context.clone();
782
783        // Process any CTEs in this statement (they might be nested)
784        for cte in &statement.ctes {
785            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
786            let cte_result = match &cte.cte_type {
787                CTEType::Standard(query) => {
788                    let view = self.build_view_with_context(
789                        table.clone(),
790                        query.clone(),
791                        &mut local_context,
792                    )?;
793
794                    // Materialize the view and enrich columns with qualified names
795                    let mut materialized = self.materialize_view(view)?;
796
797                    // Enrich columns with qualified names for proper scoping
798                    for column in materialized.columns_mut() {
799                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
800                        column.source_table = Some(cte.name.clone());
801                    }
802
803                    DataView::new(Arc::new(materialized))
804                }
805                CTEType::Web(web_spec) => {
806                    // Fetch data from URL
807                    use crate::web::http_fetcher::WebDataFetcher;
808
809                    let fetcher = WebDataFetcher::new()?;
810                    // Pass None for query context (no full SQL available in these contexts)
811                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
812
813                    // Enrich columns with qualified names for proper scoping
814                    for column in data_table.columns_mut() {
815                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
816                        column.source_table = Some(cte.name.clone());
817                    }
818
819                    // Convert DataTable to DataView
820                    DataView::new(Arc::new(data_table))
821                }
822                CTEType::File(file_spec) => {
823                    let mut data_table =
824                        crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
825
826                    for column in data_table.columns_mut() {
827                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
828                        column.source_table = Some(cte.name.clone());
829                    }
830
831                    DataView::new(Arc::new(data_table))
832                }
833            };
834            local_context.insert(cte.name.clone(), Arc::new(cte_result));
835        }
836
837        // Process subqueries with the complete context
838        let mut subquery_executor =
839            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
840        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
841
842        // Build the view
843        self.build_view_with_context(table, processed_statement, &mut local_context)
844    }
845
846    /// Execute a query and return both the result and the execution plan
847    pub fn execute_with_plan(
848        &self,
849        table: Arc<DataTable>,
850        sql: &str,
851    ) -> Result<(DataView, ExecutionPlan)> {
852        self.execute_with_plan_and_temp_tables(table, sql, None)
853    }
854
855    /// Execute a query with temp tables and return both the result and the execution plan
856    pub fn execute_with_plan_and_temp_tables(
857        &self,
858        table: Arc<DataTable>,
859        sql: &str,
860        temp_tables: Option<&TempTableRegistry>,
861    ) -> Result<(DataView, ExecutionPlan)> {
862        let mut plan_builder = ExecutionPlanBuilder::new();
863        let start_time = Instant::now();
864
865        // Parse the SQL query
866        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
867        plan_builder.add_detail(format!("Query: {}", sql));
868        let mut parser = Parser::new(sql);
869        let statement = parser
870            .parse()
871            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
872        plan_builder.add_detail(format!("Parsed successfully"));
873        if let Some(ref from_source) = statement.from_source {
874            match from_source {
875                TableSource::Table(name) => {
876                    plan_builder.add_detail(format!("FROM: {}", name));
877                }
878                TableSource::DerivedTable { alias, .. } => {
879                    plan_builder.add_detail(format!("FROM: derived table (alias: {})", alias));
880                }
881                TableSource::Pivot { .. } => {
882                    plan_builder.add_detail("FROM: PIVOT".to_string());
883                }
884            }
885        }
886        if statement.where_clause.is_some() {
887            plan_builder.add_detail("WHERE clause present".to_string());
888        }
889        plan_builder.end_step();
890
891        // First process CTEs to build context
892        let mut cte_context = HashMap::new();
893
894        // Add temp tables to CTE context if provided
895        if let Some(temp_registry) = temp_tables {
896            for table_name in temp_registry.list_tables() {
897                if let Some(temp_table) = temp_registry.get(&table_name) {
898                    debug!("Adding temp table {} to CTE context", table_name);
899                    let view = DataView::new(temp_table);
900                    cte_context.insert(table_name, Arc::new(view));
901                }
902            }
903        }
904
905        if !statement.ctes.is_empty() {
906            plan_builder.begin_step(
907                StepType::CTE,
908                format!("Process {} CTEs", statement.ctes.len()),
909            );
910
911            for cte in &statement.ctes {
912                let cte_start = Instant::now();
913                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
914
915                let cte_result = match &cte.cte_type {
916                    CTEType::Standard(query) => {
917                        // Add CTE query details
918                        if let Some(ref from_source) = query.from_source {
919                            match from_source {
920                                TableSource::Table(name) => {
921                                    plan_builder.add_detail(format!("Source: {}", name));
922                                }
923                                TableSource::DerivedTable { alias, .. } => {
924                                    plan_builder
925                                        .add_detail(format!("Source: derived table ({})", alias));
926                                }
927                                TableSource::Pivot { .. } => {
928                                    plan_builder.add_detail("Source: PIVOT".to_string());
929                                }
930                            }
931                        }
932                        if query.where_clause.is_some() {
933                            plan_builder.add_detail("Has WHERE clause".to_string());
934                        }
935                        if query.group_by.is_some() {
936                            plan_builder.add_detail("Has GROUP BY".to_string());
937                        }
938
939                        debug!(
940                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
941                            cte.name,
942                            cte_context.keys().collect::<Vec<_>>()
943                        );
944
945                        // Process subqueries in the CTE's query FIRST
946                        // This allows the subqueries to see all previously defined CTEs
947                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
948                            self.clone(),
949                            table.clone(),
950                            cte_context.clone(),
951                        );
952                        let processed_query = subquery_executor.execute_subqueries(query)?;
953
954                        let view = self.build_view_with_context(
955                            table.clone(),
956                            processed_query,
957                            &mut cte_context,
958                        )?;
959
960                        // Materialize the view and enrich columns with qualified names
961                        let mut materialized = self.materialize_view(view)?;
962
963                        // Enrich columns with qualified names for proper scoping
964                        for column in materialized.columns_mut() {
965                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
966                            column.source_table = Some(cte.name.clone());
967                        }
968
969                        DataView::new(Arc::new(materialized))
970                    }
971                    CTEType::Web(web_spec) => {
972                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
973                        if let Some(format) = &web_spec.format {
974                            plan_builder.add_detail(format!("Format: {:?}", format));
975                        }
976                        if let Some(cache) = web_spec.cache_seconds {
977                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
978                        }
979
980                        // Fetch data from URL
981                        use crate::web::http_fetcher::WebDataFetcher;
982
983                        let fetcher = WebDataFetcher::new()?;
984                        // Pass None for query context - each WEB CTE is independent
985                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
986
987                        // Enrich columns with qualified names for proper scoping
988                        for column in data_table.columns_mut() {
989                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
990                            column.source_table = Some(cte.name.clone());
991                        }
992
993                        // Convert DataTable to DataView
994                        DataView::new(Arc::new(data_table))
995                    }
996                    CTEType::File(file_spec) => {
997                        plan_builder.add_detail(format!("PATH: {}", file_spec.path));
998                        if file_spec.recursive {
999                            plan_builder.add_detail("RECURSIVE".to_string());
1000                        }
1001                        if let Some(ref g) = file_spec.glob {
1002                            plan_builder.add_detail(format!("GLOB: {}", g));
1003                        }
1004                        if let Some(d) = file_spec.max_depth {
1005                            plan_builder.add_detail(format!("MAX_DEPTH: {}", d));
1006                        }
1007
1008                        let mut data_table =
1009                            crate::data::file_walker::walk_filesystem(file_spec, &cte.name)?;
1010
1011                        for column in data_table.columns_mut() {
1012                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1013                            column.source_table = Some(cte.name.clone());
1014                        }
1015
1016                        DataView::new(Arc::new(data_table))
1017                    }
1018                };
1019
1020                // Record CTE statistics
1021                plan_builder.set_rows_out(cte_result.row_count());
1022                plan_builder.add_detail(format!(
1023                    "Result: {} rows, {} columns",
1024                    cte_result.row_count(),
1025                    cte_result.column_count()
1026                ));
1027                plan_builder.add_detail(format!(
1028                    "Execution time: {:.3}ms",
1029                    cte_start.elapsed().as_secs_f64() * 1000.0
1030                ));
1031
1032                debug!(
1033                    "QueryEngine: Storing CTE '{}' in context with {} rows",
1034                    cte.name,
1035                    cte_result.row_count()
1036                );
1037                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1038                plan_builder.end_step();
1039            }
1040
1041            plan_builder.add_detail(format!(
1042                "All {} CTEs cached in context",
1043                statement.ctes.len()
1044            ));
1045            plan_builder.end_step();
1046        }
1047
1048        // Process subqueries in the statement with CTE context
1049        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
1050        let mut subquery_executor =
1051            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
1052
1053        // Check if there are subqueries to process
1054        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
1055            // This is a simplified check - in reality we'd need to walk the AST
1056            format!("{:?}", w).contains("Subquery")
1057        });
1058
1059        if has_subqueries {
1060            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
1061        }
1062
1063        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
1064
1065        if has_subqueries {
1066            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1067        } else {
1068            plan_builder.add_detail("No subqueries to process".to_string());
1069        }
1070
1071        plan_builder.end_step();
1072        let result = self.build_view_with_context_and_plan(
1073            table,
1074            processed_statement,
1075            &mut cte_context,
1076            &mut plan_builder,
1077        )?;
1078
1079        let total_duration = start_time.elapsed();
1080        info!(
1081            "Query execution complete: total={:?}, rows={}",
1082            total_duration,
1083            result.row_count()
1084        );
1085
1086        let plan = plan_builder.build();
1087        Ok((result, plan))
1088    }
1089
1090    /// Build a `DataView` from a parsed SQL statement
1091    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1092        let mut cte_context = HashMap::new();
1093        self.build_view_with_context(table, statement, &mut cte_context)
1094    }
1095
1096    /// Build a DataView from a SelectStatement with CTE context
1097    fn build_view_with_context(
1098        &self,
1099        table: Arc<DataTable>,
1100        statement: SelectStatement,
1101        cte_context: &mut HashMap<String, Arc<DataView>>,
1102    ) -> Result<DataView> {
1103        let mut dummy_plan = ExecutionPlanBuilder::new();
1104        let mut exec_context = ExecutionContext::new();
1105        self.build_view_with_context_and_plan_and_exec(
1106            table,
1107            statement,
1108            cte_context,
1109            &mut dummy_plan,
1110            &mut exec_context,
1111        )
1112    }
1113
1114    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
1115    fn build_view_with_context_and_plan(
1116        &self,
1117        table: Arc<DataTable>,
1118        statement: SelectStatement,
1119        cte_context: &mut HashMap<String, Arc<DataView>>,
1120        plan: &mut ExecutionPlanBuilder,
1121    ) -> Result<DataView> {
1122        let mut exec_context = ExecutionContext::new();
1123        self.build_view_with_context_and_plan_and_exec(
1124            table,
1125            statement,
1126            cte_context,
1127            plan,
1128            &mut exec_context,
1129        )
1130    }
1131
1132    /// Build a DataView with CTE context, execution plan, and alias resolution context
1133    fn build_view_with_context_and_plan_and_exec(
1134        &self,
1135        table: Arc<DataTable>,
1136        statement: SelectStatement,
1137        cte_context: &mut HashMap<String, Arc<DataView>>,
1138        plan: &mut ExecutionPlanBuilder,
1139        exec_context: &mut ExecutionContext,
1140    ) -> Result<DataView> {
1141        // First, process any CTEs that aren't already in the context
1142        for cte in &statement.ctes {
1143            // Skip if already processed (e.g., by execute_select for WEB CTEs)
1144            if cte_context.contains_key(&cte.name) {
1145                debug!(
1146                    "QueryEngine: CTE '{}' already in context, skipping",
1147                    cte.name
1148                );
1149                continue;
1150            }
1151
1152            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1153            debug!(
1154                "QueryEngine: Available CTEs for '{}': {:?}",
1155                cte.name,
1156                cte_context.keys().collect::<Vec<_>>()
1157            );
1158
1159            // Execute the CTE query (it might reference earlier CTEs)
1160            let cte_result = match &cte.cte_type {
1161                CTEType::Standard(query) => {
1162                    let view =
1163                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1164
1165                    // Materialize the view and enrich columns with qualified names
1166                    let mut materialized = self.materialize_view(view)?;
1167
1168                    // Enrich columns with qualified names for proper scoping
1169                    for column in materialized.columns_mut() {
1170                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1171                        column.source_table = Some(cte.name.clone());
1172                    }
1173
1174                    DataView::new(Arc::new(materialized))
1175                }
1176                CTEType::Web(_web_spec) => {
1177                    // Web CTEs should have been processed earlier in execute_select
1178                    return Err(anyhow!(
1179                        "Web CTEs should be processed in execute_select method"
1180                    ));
1181                }
1182                CTEType::File(_file_spec) => {
1183                    // FILE CTEs (like WEB) should be processed earlier in execute_select
1184                    return Err(anyhow!(
1185                        "FILE CTEs should be processed in execute_select method"
1186                    ));
1187                }
1188            };
1189
1190            // Store the result in the context for later use
1191            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1192            debug!(
1193                "QueryEngine: CTE '{}' processed, stored in context",
1194                cte.name
1195            );
1196        }
1197
1198        // Determine the source table for the main query
1199        let source_table = if let Some(ref from_source) = statement.from_source {
1200            match from_source {
1201                TableSource::Table(table_name) => {
1202                    // Check if this references a CTE
1203                    if let Some(cte_view) = cte_context.get(table_name) {
1204                        debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1205                        // Materialize the CTE view as a table
1206                        let mut materialized = self.materialize_view((**cte_view).clone())?;
1207
1208                        // Apply alias to qualified column names if present
1209                        #[allow(deprecated)]
1210                        if let Some(ref alias) = statement.from_alias {
1211                            debug!(
1212                                "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1213                                alias, table_name
1214                            );
1215                            for column in materialized.columns_mut() {
1216                                // Replace the CTE name with the alias in qualified names
1217                                if let Some(ref qualified_name) = column.qualified_name {
1218                                    if qualified_name.starts_with(&format!("{}.", table_name)) {
1219                                        column.qualified_name = Some(qualified_name.replace(
1220                                            &format!("{}.", table_name),
1221                                            &format!("{}.", alias),
1222                                        ));
1223                                    }
1224                                }
1225                                // Update source table to reflect the alias
1226                                if column.source_table.as_ref() == Some(table_name) {
1227                                    column.source_table = Some(alias.clone());
1228                                }
1229                            }
1230                        }
1231
1232                        Arc::new(materialized)
1233                    } else {
1234                        // Regular table reference - use the provided table
1235                        table.clone()
1236                    }
1237                }
1238                TableSource::DerivedTable { query, alias } => {
1239                    // Execute the subquery and use its result as the source
1240                    debug!(
1241                        "QueryEngine: Processing FROM derived table (alias: {})",
1242                        alias
1243                    );
1244                    let subquery_result =
1245                        self.build_view_with_context(table.clone(), *query.clone(), cte_context)?;
1246
1247                    // Convert the DataView to a DataTable for use as source
1248                    // This materializes the subquery result
1249                    let mut materialized = self.materialize_view(subquery_result)?;
1250
1251                    // Apply the alias to all columns in the derived table
1252                    // Note: We set source_table but keep the column names unqualified
1253                    // so they can be referenced without the table prefix
1254                    for column in materialized.columns_mut() {
1255                        column.source_table = Some(alias.clone());
1256                    }
1257
1258                    Arc::new(materialized)
1259                }
1260                TableSource::Pivot { .. } => {
1261                    // PIVOT should have been expanded by PivotExpander transformer
1262                    return Err(anyhow!(
1263                        "PIVOT in FROM clause should have been expanded by preprocessing pipeline"
1264                    ));
1265                }
1266            }
1267        } else {
1268            // Fallback to deprecated fields for backward compatibility
1269            #[allow(deprecated)]
1270            if let Some(ref table_func) = statement.from_function {
1271                // Handle table functions like RANGE()
1272                debug!("QueryEngine: Processing table function (deprecated field)...");
1273                match table_func {
1274                    TableFunction::Generator { name, args } => {
1275                        // Use the generator registry to create the table
1276                        use crate::sql::generators::GeneratorRegistry;
1277
1278                        // Create generator registry (could be cached in QueryEngine)
1279                        let registry = GeneratorRegistry::new();
1280
1281                        if let Some(generator) = registry.get(name) {
1282                            // Evaluate arguments
1283                            let mut evaluator = ArithmeticEvaluator::with_date_notation(
1284                                &table,
1285                                self.date_notation.clone(),
1286                            );
1287                            let dummy_row = 0;
1288
1289                            let mut evaluated_args = Vec::new();
1290                            for arg in args {
1291                                evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1292                            }
1293
1294                            // Generate the table
1295                            generator.generate(evaluated_args)?
1296                        } else {
1297                            return Err(anyhow!("Unknown generator function: {}", name));
1298                        }
1299                    }
1300                }
1301            } else {
1302                #[allow(deprecated)]
1303                if let Some(ref subquery) = statement.from_subquery {
1304                    // Execute the subquery and use its result as the source
1305                    debug!("QueryEngine: Processing FROM subquery (deprecated field)...");
1306                    let subquery_result = self.build_view_with_context(
1307                        table.clone(),
1308                        *subquery.clone(),
1309                        cte_context,
1310                    )?;
1311
1312                    // Convert the DataView to a DataTable for use as source
1313                    // This materializes the subquery result
1314                    let materialized = self.materialize_view(subquery_result)?;
1315                    Arc::new(materialized)
1316                } else {
1317                    #[allow(deprecated)]
1318                    if let Some(ref table_name) = statement.from_table {
1319                        // Check if this references a CTE
1320                        if let Some(cte_view) = cte_context.get(table_name) {
1321                            debug!(
1322                                "QueryEngine: Using CTE '{}' as source table (deprecated field)",
1323                                table_name
1324                            );
1325                            // Materialize the CTE view as a table
1326                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1327
1328                            // Apply alias to qualified column names if present
1329                            #[allow(deprecated)]
1330                            if let Some(ref alias) = statement.from_alias {
1331                                debug!(
1332                                    "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1333                                    alias, table_name
1334                                );
1335                                for column in materialized.columns_mut() {
1336                                    // Replace the CTE name with the alias in qualified names
1337                                    if let Some(ref qualified_name) = column.qualified_name {
1338                                        if qualified_name.starts_with(&format!("{}.", table_name)) {
1339                                            column.qualified_name = Some(qualified_name.replace(
1340                                                &format!("{}.", table_name),
1341                                                &format!("{}.", alias),
1342                                            ));
1343                                        }
1344                                    }
1345                                    // Update source table to reflect the alias
1346                                    if column.source_table.as_ref() == Some(table_name) {
1347                                        column.source_table = Some(alias.clone());
1348                                    }
1349                                }
1350                            }
1351
1352                            Arc::new(materialized)
1353                        } else {
1354                            // Regular table reference - use the provided table
1355                            table.clone()
1356                        }
1357                    } else {
1358                        // No FROM clause - use the provided table
1359                        table.clone()
1360                    }
1361                }
1362            }
1363        };
1364
1365        // Register alias in execution context if present
1366        #[allow(deprecated)]
1367        if let Some(ref alias) = statement.from_alias {
1368            #[allow(deprecated)]
1369            if let Some(ref table_name) = statement.from_table {
1370                exec_context.register_alias(alias.clone(), table_name.clone());
1371            }
1372        }
1373
1374        // Process JOINs if present
1375        let final_table = if !statement.joins.is_empty() {
1376            plan.begin_step(
1377                StepType::Join,
1378                format!("Process {} JOINs", statement.joins.len()),
1379            );
1380            plan.set_rows_in(source_table.row_count());
1381
1382            let join_executor = HashJoinExecutor::new(self.case_insensitive);
1383            let mut current_table = source_table;
1384
1385            for (idx, join_clause) in statement.joins.iter().enumerate() {
1386                let join_start = Instant::now();
1387                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1388                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1389                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1390                plan.add_detail(format!(
1391                    "Executing {:?} JOIN on {} condition(s)",
1392                    join_clause.join_type,
1393                    join_clause.condition.conditions.len()
1394                ));
1395
1396                // Resolve the right table for the join
1397                let right_table = match &join_clause.table {
1398                    TableSource::Table(name) => {
1399                        // Check if it's a CTE reference
1400                        if let Some(cte_view) = cte_context.get(name) {
1401                            let mut materialized = self.materialize_view((**cte_view).clone())?;
1402
1403                            // Apply alias to qualified column names if present
1404                            if let Some(ref alias) = join_clause.alias {
1405                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1406                                for column in materialized.columns_mut() {
1407                                    // Replace the CTE name with the alias in qualified names
1408                                    if let Some(ref qualified_name) = column.qualified_name {
1409                                        if qualified_name.starts_with(&format!("{}.", name)) {
1410                                            column.qualified_name = Some(qualified_name.replace(
1411                                                &format!("{}.", name),
1412                                                &format!("{}.", alias),
1413                                            ));
1414                                        }
1415                                    }
1416                                    // Update source table to reflect the alias
1417                                    if column.source_table.as_ref() == Some(name) {
1418                                        column.source_table = Some(alias.clone());
1419                                    }
1420                                }
1421                            }
1422
1423                            Arc::new(materialized)
1424                        } else {
1425                            // For now, we need the actual table data
1426                            // In a real implementation, this would load from file
1427                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1428                        }
1429                    }
1430                    TableSource::DerivedTable { query, alias: _ } => {
1431                        // Execute the subquery
1432                        let subquery_result = self.build_view_with_context(
1433                            table.clone(),
1434                            *query.clone(),
1435                            cte_context,
1436                        )?;
1437                        let materialized = self.materialize_view(subquery_result)?;
1438                        Arc::new(materialized)
1439                    }
1440                    TableSource::Pivot { .. } => {
1441                        // PIVOT in JOIN is not supported yet (will be handled by transformer)
1442                        return Err(anyhow!("PIVOT in JOIN clause is not yet supported"));
1443                    }
1444                };
1445
1446                // Execute the join
1447                let joined = join_executor.execute_join(
1448                    current_table.clone(),
1449                    join_clause,
1450                    right_table.clone(),
1451                )?;
1452
1453                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1454                plan.set_rows_out(joined.row_count());
1455                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1456                plan.add_detail(format!(
1457                    "Join time: {:.3}ms",
1458                    join_start.elapsed().as_secs_f64() * 1000.0
1459                ));
1460                plan.end_step();
1461
1462                current_table = Arc::new(joined);
1463            }
1464
1465            plan.set_rows_out(current_table.row_count());
1466            plan.add_detail(format!(
1467                "Final result after all joins: {} rows",
1468                current_table.row_count()
1469            ));
1470            plan.end_step();
1471            current_table
1472        } else {
1473            source_table
1474        };
1475
1476        // Continue with the existing build_view logic but using final_table
1477        self.build_view_internal_with_plan_and_exec(
1478            final_table,
1479            statement,
1480            plan,
1481            Some(exec_context),
1482        )
1483    }
1484
1485    /// Materialize a DataView into a new DataTable
1486    pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1487        let source = view.source();
1488        let mut result_table = DataTable::new("derived");
1489
1490        // Get the visible columns from the view
1491        let visible_cols = view.visible_column_indices().to_vec();
1492
1493        // Copy column definitions
1494        for col_idx in &visible_cols {
1495            let col = &source.columns[*col_idx];
1496            let new_col = DataColumn {
1497                name: col.name.clone(),
1498                data_type: col.data_type.clone(),
1499                nullable: col.nullable,
1500                unique_values: col.unique_values,
1501                null_count: col.null_count,
1502                metadata: col.metadata.clone(),
1503                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1504                source_table: col.source_table.clone(),     // Preserve source table
1505            };
1506            result_table.add_column(new_col);
1507        }
1508
1509        // Copy visible rows
1510        for row_idx in view.visible_row_indices() {
1511            let source_row = &source.rows[*row_idx];
1512            let mut new_row = DataRow { values: Vec::new() };
1513
1514            for col_idx in &visible_cols {
1515                new_row.values.push(source_row.values[*col_idx].clone());
1516            }
1517
1518            result_table.add_row(new_row);
1519        }
1520
1521        Ok(result_table)
1522    }
1523
1524    fn build_view_internal(
1525        &self,
1526        table: Arc<DataTable>,
1527        statement: SelectStatement,
1528    ) -> Result<DataView> {
1529        let mut dummy_plan = ExecutionPlanBuilder::new();
1530        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1531    }
1532
1533    fn build_view_internal_with_plan(
1534        &self,
1535        table: Arc<DataTable>,
1536        statement: SelectStatement,
1537        plan: &mut ExecutionPlanBuilder,
1538    ) -> Result<DataView> {
1539        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1540    }
1541
1542    fn build_view_internal_with_plan_and_exec(
1543        &self,
1544        table: Arc<DataTable>,
1545        statement: SelectStatement,
1546        plan: &mut ExecutionPlanBuilder,
1547        exec_context: Option<&ExecutionContext>,
1548    ) -> Result<DataView> {
1549        debug!(
1550            "QueryEngine::build_view - select_items: {:?}",
1551            statement.select_items
1552        );
1553        debug!(
1554            "QueryEngine::build_view - where_clause: {:?}",
1555            statement.where_clause
1556        );
1557
1558        // Start with all rows visible
1559        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1560
1561        // Apply WHERE clause filtering using recursive evaluator
1562        if let Some(where_clause) = &statement.where_clause {
1563            let total_rows = table.row_count();
1564            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1565            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1566
1567            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1568            plan.set_rows_in(total_rows);
1569            plan.add_detail(format!("Input: {} rows", total_rows));
1570
1571            // Add details about WHERE conditions
1572            for condition in &where_clause.conditions {
1573                plan.add_detail(format!("Condition: {:?}", condition.expr));
1574            }
1575
1576            let filter_start = Instant::now();
1577            // Create an evaluation context for caching compiled regexes
1578            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1579
1580            // Create evaluator ONCE before the loop for performance
1581            let mut evaluator = if let Some(exec_ctx) = exec_context {
1582                // Use both contexts: exec_context for alias resolution, eval_context for regex caching
1583                RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1584            } else {
1585                RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1586            };
1587
1588            // Filter visible rows based on WHERE clause
1589            let mut filtered_rows = Vec::new();
1590            for row_idx in visible_rows {
1591                // Only log for first few rows to avoid performance impact
1592                if row_idx < 3 {
1593                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1594                }
1595
1596                match evaluator.evaluate(where_clause, row_idx) {
1597                    Ok(result) => {
1598                        if row_idx < 3 {
1599                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1600                        }
1601                        if result {
1602                            filtered_rows.push(row_idx);
1603                        }
1604                    }
1605                    Err(e) => {
1606                        if row_idx < 3 {
1607                            debug!(
1608                                "QueryEngine: WHERE evaluation error for row {}: {}",
1609                                row_idx, e
1610                            );
1611                        }
1612                        // Propagate WHERE clause errors instead of silently ignoring them
1613                        return Err(e);
1614                    }
1615                }
1616            }
1617
1618            // Log regex cache statistics
1619            let (compilations, cache_hits) = eval_context.get_stats();
1620            if compilations > 0 || cache_hits > 0 {
1621                debug!(
1622                    "LIKE pattern cache: {} compilations, {} cache hits",
1623                    compilations, cache_hits
1624                );
1625            }
1626            visible_rows = filtered_rows;
1627            let filter_duration = filter_start.elapsed();
1628            info!(
1629                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1630                total_rows,
1631                visible_rows.len(),
1632                filter_duration
1633            );
1634
1635            plan.set_rows_out(visible_rows.len());
1636            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1637            plan.add_detail(format!(
1638                "Filter time: {:.3}ms",
1639                filter_duration.as_secs_f64() * 1000.0
1640            ));
1641            plan.end_step();
1642        }
1643
1644        // Create initial DataView with filtered rows
1645        let mut view = DataView::new(table.clone());
1646        view = view.with_rows(visible_rows);
1647
1648        // Handle GROUP BY if present
1649        if let Some(group_by_exprs) = &statement.group_by {
1650            if !group_by_exprs.is_empty() {
1651                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1652
1653                plan.begin_step(
1654                    StepType::GroupBy,
1655                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1656                );
1657                plan.set_rows_in(view.row_count());
1658                plan.add_detail(format!("Input: {} rows", view.row_count()));
1659                for expr in group_by_exprs {
1660                    plan.add_detail(format!("Group by: {:?}", expr));
1661                }
1662
1663                let group_start = Instant::now();
1664                view = self.apply_group_by(
1665                    view,
1666                    group_by_exprs,
1667                    &statement.select_items,
1668                    statement.having.as_ref(),
1669                    plan,
1670                )?;
1671
1672                plan.set_rows_out(view.row_count());
1673                plan.add_detail(format!("Output: {} groups", view.row_count()));
1674                plan.add_detail(format!(
1675                    "Overall time: {:.3}ms",
1676                    group_start.elapsed().as_secs_f64() * 1000.0
1677                ));
1678                plan.end_step();
1679            }
1680        } else {
1681            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1682            if !statement.select_items.is_empty() {
1683                // Check if we have ANY non-star items (not just the first one)
1684                let has_non_star_items = statement
1685                    .select_items
1686                    .iter()
1687                    .any(|item| !matches!(item, SelectItem::Star { .. }));
1688
1689                // Apply select items if:
1690                // 1. We have computed expressions or explicit columns
1691                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1692                if has_non_star_items || statement.select_items.len() > 1 {
1693                    view = self.apply_select_items(
1694                        view,
1695                        &statement.select_items,
1696                        &statement,
1697                        exec_context,
1698                        plan,
1699                    )?;
1700                }
1701                // If it's just a single star, no projection needed
1702            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1703                debug!("QueryEngine: Using legacy columns path");
1704                // Fallback to legacy column projection for backward compatibility
1705                // Use the current view's source table, not the original table
1706                let source_table = view.source();
1707                let column_indices =
1708                    self.resolve_column_indices(source_table, &statement.columns)?;
1709                view = view.with_columns(column_indices);
1710            }
1711        }
1712
1713        // Apply DISTINCT if specified
1714        if statement.distinct {
1715            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1716            plan.set_rows_in(view.row_count());
1717            plan.add_detail(format!("Input: {} rows", view.row_count()));
1718
1719            let distinct_start = Instant::now();
1720            view = self.apply_distinct(view)?;
1721
1722            plan.set_rows_out(view.row_count());
1723            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1724            plan.add_detail(format!(
1725                "Distinct time: {:.3}ms",
1726                distinct_start.elapsed().as_secs_f64() * 1000.0
1727            ));
1728            plan.end_step();
1729        }
1730
1731        // Apply ORDER BY sorting
1732        if let Some(order_by_columns) = &statement.order_by {
1733            if !order_by_columns.is_empty() {
1734                plan.begin_step(
1735                    StepType::Sort,
1736                    format!("ORDER BY {} columns", order_by_columns.len()),
1737                );
1738                plan.set_rows_in(view.row_count());
1739                for col in order_by_columns {
1740                    // Format the expression (simplified for now - just show column name or "expr")
1741                    let expr_str = match &col.expr {
1742                        SqlExpression::Column(col_ref) => col_ref.name.clone(),
1743                        _ => "expr".to_string(),
1744                    };
1745                    plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1746                }
1747
1748                let sort_start = Instant::now();
1749                view =
1750                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1751
1752                plan.add_detail(format!(
1753                    "Sort time: {:.3}ms",
1754                    sort_start.elapsed().as_secs_f64() * 1000.0
1755                ));
1756                plan.end_step();
1757            }
1758        }
1759
1760        // Apply LIMIT/OFFSET
1761        if let Some(limit) = statement.limit {
1762            let offset = statement.offset.unwrap_or(0);
1763            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1764            plan.set_rows_in(view.row_count());
1765            if offset > 0 {
1766                plan.add_detail(format!("OFFSET: {}", offset));
1767            }
1768            view = view.with_limit(limit, offset);
1769            plan.set_rows_out(view.row_count());
1770            plan.add_detail(format!("Output: {} rows", view.row_count()));
1771            plan.end_step();
1772        }
1773
1774        // Process set operations (UNION ALL, UNION, INTERSECT, EXCEPT)
1775        if !statement.set_operations.is_empty() {
1776            plan.begin_step(
1777                StepType::SetOperation,
1778                format!("Process {} set operations", statement.set_operations.len()),
1779            );
1780            plan.set_rows_in(view.row_count());
1781
1782            // Materialize the first result set
1783            let mut combined_table = self.materialize_view(view)?;
1784            let first_columns = combined_table.column_names();
1785            let first_column_count = first_columns.len();
1786
1787            // Track if any operation requires deduplication
1788            let mut needs_deduplication = false;
1789
1790            // Process each set operation
1791            for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1792                let op_start = Instant::now();
1793                plan.begin_step(
1794                    StepType::SetOperation,
1795                    format!("{:?} operation #{}", operation, idx + 1),
1796                );
1797
1798                // Execute the next SELECT statement
1799                // We need to pass the original table and exec_context for proper resolution
1800                let next_view = if let Some(exec_ctx) = exec_context {
1801                    self.build_view_internal_with_plan_and_exec(
1802                        table.clone(),
1803                        *next_statement.clone(),
1804                        plan,
1805                        Some(exec_ctx),
1806                    )?
1807                } else {
1808                    self.build_view_internal_with_plan(
1809                        table.clone(),
1810                        *next_statement.clone(),
1811                        plan,
1812                    )?
1813                };
1814
1815                // Materialize the next result set
1816                let next_table = self.materialize_view(next_view)?;
1817                let next_columns = next_table.column_names();
1818                let next_column_count = next_columns.len();
1819
1820                // Validate schema compatibility
1821                if first_column_count != next_column_count {
1822                    return Err(anyhow!(
1823                        "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1824                        first_column_count,
1825                        idx + 2,
1826                        next_column_count
1827                    ));
1828                }
1829
1830                // Warn if column names don't match (but allow it - some SQL dialects do)
1831                for (col_idx, (first_col, next_col)) in
1832                    first_columns.iter().zip(next_columns.iter()).enumerate()
1833                {
1834                    if !first_col.eq_ignore_ascii_case(next_col) {
1835                        debug!(
1836                            "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1837                            col_idx + 1,
1838                            first_col,
1839                            next_col
1840                        );
1841                    }
1842                }
1843
1844                plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1845                plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1846
1847                // Perform the set operation
1848                match operation {
1849                    SetOperation::UnionAll => {
1850                        // UNION ALL: Simply concatenate all rows without deduplication
1851                        for row in next_table.rows.iter() {
1852                            combined_table.add_row(row.clone());
1853                        }
1854                        plan.add_detail(format!(
1855                            "Result: {} rows (no deduplication)",
1856                            combined_table.row_count()
1857                        ));
1858                    }
1859                    SetOperation::Union => {
1860                        // UNION: Concatenate all rows first, deduplicate at the end
1861                        for row in next_table.rows.iter() {
1862                            combined_table.add_row(row.clone());
1863                        }
1864                        needs_deduplication = true;
1865                        plan.add_detail(format!(
1866                            "Combined: {} rows (deduplication pending)",
1867                            combined_table.row_count()
1868                        ));
1869                    }
1870                    SetOperation::Intersect => {
1871                        // INTERSECT: Keep only rows that appear in both
1872                        // TODO: Implement intersection logic
1873                        return Err(anyhow!("INTERSECT is not yet implemented"));
1874                    }
1875                    SetOperation::Except => {
1876                        // EXCEPT: Keep only rows from left that don't appear in right
1877                        // TODO: Implement except logic
1878                        return Err(anyhow!("EXCEPT is not yet implemented"));
1879                    }
1880                }
1881
1882                plan.add_detail(format!(
1883                    "Operation time: {:.3}ms",
1884                    op_start.elapsed().as_secs_f64() * 1000.0
1885                ));
1886                plan.set_rows_out(combined_table.row_count());
1887                plan.end_step();
1888            }
1889
1890            plan.set_rows_out(combined_table.row_count());
1891            plan.add_detail(format!(
1892                "Combined result: {} rows after {} operations",
1893                combined_table.row_count(),
1894                statement.set_operations.len()
1895            ));
1896            plan.end_step();
1897
1898            // Create a new view from the combined table
1899            view = DataView::new(Arc::new(combined_table));
1900
1901            // Apply deduplication if any UNION (not UNION ALL) operation was used
1902            if needs_deduplication {
1903                plan.begin_step(
1904                    StepType::Distinct,
1905                    "UNION deduplication - remove duplicate rows".to_string(),
1906                );
1907                plan.set_rows_in(view.row_count());
1908                plan.add_detail(format!("Input: {} rows", view.row_count()));
1909
1910                let distinct_start = Instant::now();
1911                view = self.apply_distinct(view)?;
1912
1913                plan.set_rows_out(view.row_count());
1914                plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1915                plan.add_detail(format!(
1916                    "Deduplication time: {:.3}ms",
1917                    distinct_start.elapsed().as_secs_f64() * 1000.0
1918                ));
1919                plan.end_step();
1920            }
1921        }
1922
1923        Ok(view)
1924    }
1925
1926    /// Resolve column names to indices
1927    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1928        let mut indices = Vec::new();
1929        let table_columns = table.column_names();
1930
1931        for col_name in columns {
1932            let index = table_columns
1933                .iter()
1934                .position(|c| c.eq_ignore_ascii_case(col_name))
1935                .ok_or_else(|| {
1936                    let suggestion = self.find_similar_column(table, col_name);
1937                    match suggestion {
1938                        Some(similar) => anyhow::anyhow!(
1939                            "Column '{}' not found. Did you mean '{}'?",
1940                            col_name,
1941                            similar
1942                        ),
1943                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1944                    }
1945                })?;
1946            indices.push(index);
1947        }
1948
1949        Ok(indices)
1950    }
1951
1952    /// Apply SELECT items (columns and computed expressions) to create new view
1953    fn apply_select_items(
1954        &self,
1955        view: DataView,
1956        select_items: &[SelectItem],
1957        _statement: &SelectStatement,
1958        exec_context: Option<&ExecutionContext>,
1959        plan: &mut ExecutionPlanBuilder,
1960    ) -> Result<DataView> {
1961        debug!(
1962            "QueryEngine::apply_select_items - items: {:?}",
1963            select_items
1964        );
1965        debug!(
1966            "QueryEngine::apply_select_items - input view has {} rows",
1967            view.row_count()
1968        );
1969
1970        // Check if any select items contain window functions
1971        let has_window_functions = select_items.iter().any(|item| match item {
1972            SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1973            _ => false,
1974        });
1975
1976        // Count window functions for detailed reporting
1977        let window_func_count: usize = select_items
1978            .iter()
1979            .filter(|item| match item {
1980                SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1981                _ => false,
1982            })
1983            .count();
1984
1985        // Start timing for window function evaluation if present
1986        let window_start = if has_window_functions {
1987            debug!(
1988                "QueryEngine::apply_select_items - detected {} window functions",
1989                window_func_count
1990            );
1991
1992            // Extract window specs (Step 2: parallel path, not used yet)
1993            let window_specs = Self::extract_window_specs(select_items);
1994            debug!("Extracted {} window function specs", window_specs.len());
1995
1996            Some(Instant::now())
1997        } else {
1998            None
1999        };
2000
2001        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
2002        let has_unnest = select_items.iter().any(|item| match item {
2003            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
2004            _ => false,
2005        });
2006
2007        if has_unnest {
2008            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
2009            return self.apply_select_with_row_expansion(view, select_items);
2010        }
2011
2012        // Check if this is an aggregate query:
2013        // 1. At least one aggregate function exists
2014        // 2. All other items are either aggregates or constants (aggregate-compatible)
2015        let has_aggregates = select_items.iter().any(|item| match item {
2016            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
2017            SelectItem::Column { .. } => false,
2018            SelectItem::Star { .. } => false,
2019            SelectItem::StarExclude { .. } => false,
2020        });
2021
2022        let all_aggregate_compatible = select_items.iter().all(|item| match item {
2023            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
2024            SelectItem::Column { .. } => false, // Columns are not aggregate-compatible
2025            SelectItem::Star { .. } => false,   // Star is not aggregate-compatible
2026            SelectItem::StarExclude { .. } => false, // StarExclude is not aggregate-compatible
2027        });
2028
2029        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
2030            // Special handling for aggregate queries with constants (no GROUP BY)
2031            // These should produce exactly one row
2032            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
2033            return self.apply_aggregate_select(view, select_items);
2034        }
2035
2036        // Check if we need to create computed columns
2037        let has_computed_expressions = select_items
2038            .iter()
2039            .any(|item| matches!(item, SelectItem::Expression { .. }));
2040
2041        debug!(
2042            "QueryEngine::apply_select_items - has_computed_expressions: {}",
2043            has_computed_expressions
2044        );
2045
2046        if !has_computed_expressions {
2047            // Simple case: only columns, use existing projection logic
2048            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
2049            return Ok(view.with_columns(column_indices));
2050        }
2051
2052        // Complex case: we have computed expressions
2053        // IMPORTANT: We create a PROJECTED view, not a new table
2054        // This preserves the original DataTable reference
2055
2056        let source_table = view.source();
2057        let visible_rows = view.visible_row_indices();
2058
2059        // Create a temporary table just for the computed result view
2060        // But this table is only used for the current query result
2061        let mut computed_table = DataTable::new("query_result");
2062
2063        // First, expand any Star selectors to actual columns
2064        let mut expanded_items = Vec::new();
2065        for item in select_items {
2066            match item {
2067                SelectItem::Star { table_prefix, .. } => {
2068                    if let Some(prefix) = table_prefix {
2069                        // Scoped expansion: table.* expands only columns from that table
2070                        debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
2071                        for col in &source_table.columns {
2072                            if Self::column_matches_table(col, prefix) {
2073                                expanded_items.push(SelectItem::Column {
2074                                    column: ColumnRef::unquoted(col.name.clone()),
2075                                    leading_comments: vec![],
2076                                    trailing_comment: None,
2077                                });
2078                            }
2079                        }
2080                    } else {
2081                        // Unscoped expansion: * expands to all columns
2082                        debug!("QueryEngine::apply_select_items - expanding *");
2083                        for col_name in source_table.column_names() {
2084                            expanded_items.push(SelectItem::Column {
2085                                column: ColumnRef::unquoted(col_name.to_string()),
2086                                leading_comments: vec![],
2087                                trailing_comment: None,
2088                            });
2089                        }
2090                    }
2091                }
2092                _ => expanded_items.push(item.clone()),
2093            }
2094        }
2095
2096        // Add columns based on expanded SelectItems, handling duplicates
2097        let mut column_name_counts: std::collections::HashMap<String, usize> =
2098            std::collections::HashMap::new();
2099
2100        for item in &expanded_items {
2101            let base_name = match item {
2102                SelectItem::Column {
2103                    column: col_ref, ..
2104                } => col_ref.name.clone(),
2105                SelectItem::Expression { alias, .. } => alias.clone(),
2106                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2107                SelectItem::StarExclude { .. } => {
2108                    unreachable!("StarExclude should have been expanded")
2109                }
2110            };
2111
2112            // Check if this column name has been used before
2113            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
2114            let column_name = if *count == 0 {
2115                // First occurrence, use the name as-is
2116                base_name.clone()
2117            } else {
2118                // Duplicate, append a suffix
2119                format!("{base_name}_{count}")
2120            };
2121            *count += 1;
2122
2123            computed_table.add_column(DataColumn::new(&column_name));
2124        }
2125
2126        // Check if batch evaluation can be used
2127        // Batch evaluation is the default but we need to check if all window functions
2128        // are standalone (not embedded in expressions)
2129        let can_use_batch = expanded_items.iter().all(|item| {
2130            match item {
2131                SelectItem::Expression { expr, .. } => {
2132                    // Only use batch evaluation if the expression IS a window function,
2133                    // not if it CONTAINS a window function
2134                    matches!(expr, SqlExpression::WindowFunction { .. })
2135                        || !Self::contains_window_function(expr)
2136                }
2137                _ => true, // Non-expressions are fine
2138            }
2139        });
2140
2141        // Batch evaluation is now the default mode for improved performance
2142        // Users can opt-out by setting SQL_CLI_BATCH_WINDOW=0 or false
2143        let use_batch_evaluation = can_use_batch
2144            && std::env::var("SQL_CLI_BATCH_WINDOW")
2145                .map(|v| v != "0" && v.to_lowercase() != "false")
2146                .unwrap_or(true);
2147
2148        // Store window specs for batch evaluation if needed
2149        let batch_window_specs = if use_batch_evaluation && has_window_functions {
2150            debug!("BATCH window function evaluation flag is enabled");
2151            // Extract window specs before timing starts
2152            let specs = Self::extract_window_specs(&expanded_items);
2153            debug!(
2154                "Extracted {} window function specs for batch evaluation",
2155                specs.len()
2156            );
2157            Some(specs)
2158        } else {
2159            None
2160        };
2161
2162        // Calculate values for each row
2163        let mut evaluator =
2164            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2165
2166        // Populate table aliases from exec_context if available
2167        if let Some(exec_ctx) = exec_context {
2168            let aliases = exec_ctx.get_aliases();
2169            if !aliases.is_empty() {
2170                debug!(
2171                    "Applying {} aliases to evaluator: {:?}",
2172                    aliases.len(),
2173                    aliases
2174                );
2175                evaluator = evaluator.with_table_aliases(aliases);
2176            }
2177        }
2178
2179        // OPTIMIZATION: Pre-create WindowContexts before the row loop
2180        // This avoids 50,000+ redundant context lookups
2181        if has_window_functions {
2182            let preload_start = Instant::now();
2183
2184            // Extract all unique WindowSpecs from SELECT items
2185            let mut window_specs = Vec::new();
2186            for item in &expanded_items {
2187                if let SelectItem::Expression { expr, .. } = item {
2188                    Self::collect_window_specs(expr, &mut window_specs);
2189                }
2190            }
2191
2192            // Pre-create all WindowContexts
2193            for spec in &window_specs {
2194                let _ = evaluator.get_or_create_window_context(spec);
2195            }
2196
2197            debug!(
2198                "Pre-created {} WindowContext(s) in {:.2}ms",
2199                window_specs.len(),
2200                preload_start.elapsed().as_secs_f64() * 1000.0
2201            );
2202        }
2203
2204        // Batch evaluation path for window functions
2205        if let Some(window_specs) = batch_window_specs {
2206            debug!("Starting batch window function evaluation");
2207            let batch_start = Instant::now();
2208
2209            // Initialize result table with all rows
2210            let mut batch_results: Vec<Vec<DataValue>> =
2211                vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2212
2213            // Use the window specs we extracted earlier
2214            let detailed_window_specs = &window_specs;
2215
2216            // Group window specs by their WindowSpec for batch processing
2217            let mut specs_by_window: HashMap<
2218                u64,
2219                Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2220            > = HashMap::new();
2221            for spec in detailed_window_specs {
2222                let hash = spec.spec.compute_hash();
2223                specs_by_window
2224                    .entry(hash)
2225                    .or_insert_with(Vec::new)
2226                    .push(spec);
2227            }
2228
2229            // Process each unique window specification
2230            for (_window_hash, specs) in specs_by_window {
2231                // Get the window context (already pre-created)
2232                let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2233
2234                // Process each function using this window
2235                for spec in specs {
2236                    match spec.function_name.as_str() {
2237                        "LAG" => {
2238                            // Extract column and offset from arguments
2239                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2240                                let column_name = col_ref.name.as_str();
2241                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2242                                    spec.args.get(1)
2243                                {
2244                                    n.parse::<i64>().unwrap_or(1)
2245                                } else {
2246                                    1 // default offset
2247                                };
2248
2249                                let values = context.evaluate_lag_batch(
2250                                    visible_rows,
2251                                    column_name,
2252                                    offset,
2253                                )?;
2254
2255                                // Write results to the output column
2256                                for (row_idx, value) in values.into_iter().enumerate() {
2257                                    batch_results[row_idx][spec.output_column_index] = value;
2258                                }
2259                            }
2260                        }
2261                        "LEAD" => {
2262                            // Extract column and offset from arguments
2263                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2264                                let column_name = col_ref.name.as_str();
2265                                let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2266                                    spec.args.get(1)
2267                                {
2268                                    n.parse::<i64>().unwrap_or(1)
2269                                } else {
2270                                    1 // default offset
2271                                };
2272
2273                                let values = context.evaluate_lead_batch(
2274                                    visible_rows,
2275                                    column_name,
2276                                    offset,
2277                                )?;
2278
2279                                // Write results to the output column
2280                                for (row_idx, value) in values.into_iter().enumerate() {
2281                                    batch_results[row_idx][spec.output_column_index] = value;
2282                                }
2283                            }
2284                        }
2285                        "ROW_NUMBER" => {
2286                            let values = context.evaluate_row_number_batch(visible_rows)?;
2287
2288                            // Write results to the output column
2289                            for (row_idx, value) in values.into_iter().enumerate() {
2290                                batch_results[row_idx][spec.output_column_index] = value;
2291                            }
2292                        }
2293                        "RANK" => {
2294                            let values = context.evaluate_rank_batch(visible_rows)?;
2295
2296                            // Write results to the output column
2297                            for (row_idx, value) in values.into_iter().enumerate() {
2298                                batch_results[row_idx][spec.output_column_index] = value;
2299                            }
2300                        }
2301                        "DENSE_RANK" => {
2302                            let values = context.evaluate_dense_rank_batch(visible_rows)?;
2303
2304                            // Write results to the output column
2305                            for (row_idx, value) in values.into_iter().enumerate() {
2306                                batch_results[row_idx][spec.output_column_index] = value;
2307                            }
2308                        }
2309                        "SUM" => {
2310                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2311                                let column_name = col_ref.name.as_str();
2312                                let values =
2313                                    context.evaluate_sum_batch(visible_rows, column_name)?;
2314
2315                                for (row_idx, value) in values.into_iter().enumerate() {
2316                                    batch_results[row_idx][spec.output_column_index] = value;
2317                                }
2318                            }
2319                        }
2320                        "AVG" => {
2321                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2322                                let column_name = col_ref.name.as_str();
2323                                let values =
2324                                    context.evaluate_avg_batch(visible_rows, column_name)?;
2325
2326                                for (row_idx, value) in values.into_iter().enumerate() {
2327                                    batch_results[row_idx][spec.output_column_index] = value;
2328                                }
2329                            }
2330                        }
2331                        "MIN" => {
2332                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2333                                let column_name = col_ref.name.as_str();
2334                                let values =
2335                                    context.evaluate_min_batch(visible_rows, column_name)?;
2336
2337                                for (row_idx, value) in values.into_iter().enumerate() {
2338                                    batch_results[row_idx][spec.output_column_index] = value;
2339                                }
2340                            }
2341                        }
2342                        "MAX" => {
2343                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2344                                let column_name = col_ref.name.as_str();
2345                                let values =
2346                                    context.evaluate_max_batch(visible_rows, column_name)?;
2347
2348                                for (row_idx, value) in values.into_iter().enumerate() {
2349                                    batch_results[row_idx][spec.output_column_index] = value;
2350                                }
2351                            }
2352                        }
2353                        "COUNT" => {
2354                            // COUNT can be COUNT(*) or COUNT(column)
2355                            let column_name = match spec.args.get(0) {
2356                                Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2357                                Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2358                                _ => None,
2359                            };
2360
2361                            let values = context.evaluate_count_batch(visible_rows, column_name)?;
2362
2363                            for (row_idx, value) in values.into_iter().enumerate() {
2364                                batch_results[row_idx][spec.output_column_index] = value;
2365                            }
2366                        }
2367                        "FIRST_VALUE" => {
2368                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2369                                let column_name = col_ref.name.as_str();
2370                                let values = context
2371                                    .evaluate_first_value_batch(visible_rows, column_name)?;
2372
2373                                for (row_idx, value) in values.into_iter().enumerate() {
2374                                    batch_results[row_idx][spec.output_column_index] = value;
2375                                }
2376                            }
2377                        }
2378                        "LAST_VALUE" => {
2379                            if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2380                                let column_name = col_ref.name.as_str();
2381                                let values =
2382                                    context.evaluate_last_value_batch(visible_rows, column_name)?;
2383
2384                                for (row_idx, value) in values.into_iter().enumerate() {
2385                                    batch_results[row_idx][spec.output_column_index] = value;
2386                                }
2387                            }
2388                        }
2389                        _ => {
2390                            // Fall back to per-row evaluation for unsupported functions
2391                            debug!(
2392                                "Window function {} not supported in batch mode, using per-row",
2393                                spec.function_name
2394                            );
2395                        }
2396                    }
2397                }
2398            }
2399
2400            // Now evaluate non-window columns
2401            for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2402                for (col_idx, item) in expanded_items.iter().enumerate() {
2403                    // Skip if this column was already filled by a window function
2404                    if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2405                        continue;
2406                    }
2407
2408                    let value = match item {
2409                        SelectItem::Column {
2410                            column: col_ref, ..
2411                        } => {
2412                            match evaluator
2413                                .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2414                            {
2415                                Ok(val) => val,
2416                                Err(e) => {
2417                                    return Err(anyhow!(
2418                                        "Failed to evaluate column {}: {}",
2419                                        col_ref.to_sql(),
2420                                        e
2421                                    ));
2422                                }
2423                            }
2424                        }
2425                        SelectItem::Expression { expr, .. } => {
2426                            // For batch evaluation, we need to handle expressions differently
2427                            // If this is just a window function by itself, we already computed it
2428                            if matches!(expr, SqlExpression::WindowFunction { .. }) {
2429                                // Pure window function - already handled in batch evaluation
2430                                continue;
2431                            }
2432                            // For expressions containing window functions or regular expressions,
2433                            // evaluate them normally
2434                            evaluator.evaluate(&expr, source_row_idx)?
2435                        }
2436                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2437                        SelectItem::StarExclude { .. } => {
2438                            unreachable!("StarExclude should have been expanded")
2439                        }
2440                    };
2441                    batch_results[result_row_idx][col_idx] = value;
2442                }
2443            }
2444
2445            // Add all rows to the table
2446            for row_values in batch_results {
2447                computed_table
2448                    .add_row(DataRow::new(row_values))
2449                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2450            }
2451
2452            debug!(
2453                "Batch window evaluation completed in {:.3}ms",
2454                batch_start.elapsed().as_secs_f64() * 1000.0
2455            );
2456        } else {
2457            // Original per-row evaluation path
2458            for &row_idx in visible_rows {
2459                let mut row_values = Vec::new();
2460
2461                for item in &expanded_items {
2462                    let value = match item {
2463                        SelectItem::Column {
2464                            column: col_ref, ..
2465                        } => {
2466                            // Use evaluator for column resolution (handles aliases properly)
2467                            match evaluator
2468                                .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2469                            {
2470                                Ok(val) => val,
2471                                Err(e) => {
2472                                    return Err(anyhow!(
2473                                        "Failed to evaluate column {}: {}",
2474                                        col_ref.to_sql(),
2475                                        e
2476                                    ));
2477                                }
2478                            }
2479                        }
2480                        SelectItem::Expression { expr, .. } => {
2481                            // Computed expression
2482                            evaluator.evaluate(&expr, row_idx)?
2483                        }
2484                        SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2485                        SelectItem::StarExclude { .. } => {
2486                            unreachable!("StarExclude should have been expanded")
2487                        }
2488                    };
2489                    row_values.push(value);
2490                }
2491
2492                computed_table
2493                    .add_row(DataRow::new(row_values))
2494                    .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2495            }
2496        }
2497
2498        // Log window function timing if applicable
2499        if let Some(start) = window_start {
2500            let window_duration = start.elapsed();
2501            info!(
2502                "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2503                window_duration.as_secs_f64() * 1000.0,
2504                visible_rows.len(),
2505                window_func_count
2506            );
2507
2508            // Add to execution plan
2509            plan.begin_step(
2510                StepType::WindowFunction,
2511                format!("Evaluate {} window function(s)", window_func_count),
2512            );
2513            plan.set_rows_in(visible_rows.len());
2514            plan.set_rows_out(visible_rows.len());
2515            plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2516            plan.add_detail(format!("{} window functions evaluated", window_func_count));
2517            plan.add_detail(format!(
2518                "Evaluation time: {:.3}ms",
2519                window_duration.as_secs_f64() * 1000.0
2520            ));
2521            plan.end_step();
2522        }
2523
2524        // Return a view of the computed result
2525        // This is a temporary view for this query only
2526        Ok(DataView::new(Arc::new(computed_table)))
2527    }
2528
2529    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
2530    fn apply_select_with_row_expansion(
2531        &self,
2532        view: DataView,
2533        select_items: &[SelectItem],
2534    ) -> Result<DataView> {
2535        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2536
2537        let source_table = view.source();
2538        let visible_rows = view.visible_row_indices();
2539        let expander_registry = RowExpanderRegistry::new();
2540
2541        // Create result table
2542        let mut result_table = DataTable::new("unnest_result");
2543
2544        // Expand * to columns and set up result columns
2545        let mut expanded_items = Vec::new();
2546        for item in select_items {
2547            match item {
2548                SelectItem::Star { table_prefix, .. } => {
2549                    if let Some(prefix) = table_prefix {
2550                        // Scoped expansion: table.* expands only columns from that table
2551                        debug!(
2552                            "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2553                            prefix
2554                        );
2555                        for col in &source_table.columns {
2556                            if Self::column_matches_table(col, prefix) {
2557                                expanded_items.push(SelectItem::Column {
2558                                    column: ColumnRef::unquoted(col.name.clone()),
2559                                    leading_comments: vec![],
2560                                    trailing_comment: None,
2561                                });
2562                            }
2563                        }
2564                    } else {
2565                        // Unscoped expansion: * expands to all columns
2566                        debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2567                        for col_name in source_table.column_names() {
2568                            expanded_items.push(SelectItem::Column {
2569                                column: ColumnRef::unquoted(col_name.to_string()),
2570                                leading_comments: vec![],
2571                                trailing_comment: None,
2572                            });
2573                        }
2574                    }
2575                }
2576                _ => expanded_items.push(item.clone()),
2577            }
2578        }
2579
2580        // Add columns to result table
2581        for item in &expanded_items {
2582            let column_name = match item {
2583                SelectItem::Column {
2584                    column: col_ref, ..
2585                } => col_ref.name.clone(),
2586                SelectItem::Expression { alias, .. } => alias.clone(),
2587                SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2588                SelectItem::StarExclude { .. } => {
2589                    unreachable!("StarExclude should have been expanded")
2590                }
2591            };
2592            result_table.add_column(DataColumn::new(&column_name));
2593        }
2594
2595        // Process each input row
2596        let mut evaluator =
2597            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2598
2599        for &row_idx in visible_rows {
2600            // First pass: identify UNNEST expressions and collect their expansion arrays
2601            let mut unnest_expansions = Vec::new();
2602            let mut unnest_indices = Vec::new();
2603
2604            for (col_idx, item) in expanded_items.iter().enumerate() {
2605                if let SelectItem::Expression { expr, .. } = item {
2606                    if let Some(expansion_result) = self.try_expand_unnest(
2607                        &expr,
2608                        source_table,
2609                        row_idx,
2610                        &mut evaluator,
2611                        &expander_registry,
2612                    )? {
2613                        unnest_expansions.push(expansion_result);
2614                        unnest_indices.push(col_idx);
2615                    }
2616                }
2617            }
2618
2619            // Determine how many output rows to generate
2620            let expansion_count = if unnest_expansions.is_empty() {
2621                1 // No UNNEST, just one row
2622            } else {
2623                unnest_expansions
2624                    .iter()
2625                    .map(|exp| exp.row_count())
2626                    .max()
2627                    .unwrap_or(1)
2628            };
2629
2630            // Generate output rows
2631            for output_idx in 0..expansion_count {
2632                let mut row_values = Vec::new();
2633
2634                for (col_idx, item) in expanded_items.iter().enumerate() {
2635                    // Check if this column is an UNNEST column
2636                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2637
2638                    let value = if let Some(unnest_idx) = unnest_position {
2639                        // Get value from expansion array (or NULL if exhausted)
2640                        let expansion = &unnest_expansions[unnest_idx];
2641                        expansion
2642                            .values
2643                            .get(output_idx)
2644                            .cloned()
2645                            .unwrap_or(DataValue::Null)
2646                    } else {
2647                        // Regular column or non-UNNEST expression - replicate from input
2648                        match item {
2649                            SelectItem::Column {
2650                                column: col_ref, ..
2651                            } => {
2652                                let col_idx =
2653                                    source_table.get_column_index(&col_ref.name).ok_or_else(
2654                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2655                                    )?;
2656                                let row = source_table
2657                                    .get_row(row_idx)
2658                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2659                                row.get(col_idx)
2660                                    .ok_or_else(|| {
2661                                        anyhow::anyhow!("Column {} not found in row", col_idx)
2662                                    })?
2663                                    .clone()
2664                            }
2665                            SelectItem::Expression { expr, .. } => {
2666                                // Non-UNNEST expression - evaluate once and replicate
2667                                evaluator.evaluate(&expr, row_idx)?
2668                            }
2669                            SelectItem::Star { .. } => unreachable!(),
2670                            SelectItem::StarExclude { .. } => {
2671                                unreachable!("StarExclude should have been expanded")
2672                            }
2673                        }
2674                    };
2675
2676                    row_values.push(value);
2677                }
2678
2679                result_table
2680                    .add_row(DataRow::new(row_values))
2681                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2682            }
2683        }
2684
2685        debug!(
2686            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2687            visible_rows.len(),
2688            result_table.row_count()
2689        );
2690
2691        Ok(DataView::new(Arc::new(result_table)))
2692    }
2693
2694    /// Try to expand an expression if it's an UNNEST call
2695    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
2696    fn try_expand_unnest(
2697        &self,
2698        expr: &SqlExpression,
2699        _source_table: &DataTable,
2700        row_idx: usize,
2701        evaluator: &mut ArithmeticEvaluator,
2702        expander_registry: &RowExpanderRegistry,
2703    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2704        // Check for UNNEST variant (direct syntax)
2705        if let SqlExpression::Unnest { column, delimiter } = expr {
2706            // Evaluate the column expression
2707            let column_value = evaluator.evaluate(column, row_idx)?;
2708
2709            // Delimiter is already a string literal
2710            let delimiter_value = DataValue::String(delimiter.clone());
2711
2712            // Get the UNNEST expander
2713            let expander = expander_registry
2714                .get("UNNEST")
2715                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2716
2717            // Expand the value
2718            let expansion = expander.expand(&column_value, &[delimiter_value])?;
2719            return Ok(Some(expansion));
2720        }
2721
2722        // Also check for FunctionCall form (for compatibility)
2723        if let SqlExpression::FunctionCall { name, args, .. } = expr {
2724            if name.to_uppercase() == "UNNEST" {
2725                // UNNEST(column, delimiter)
2726                if args.len() != 2 {
2727                    return Err(anyhow::anyhow!(
2728                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2729                    ));
2730                }
2731
2732                // Evaluate the column expression (first arg)
2733                let column_value = evaluator.evaluate(&args[0], row_idx)?;
2734
2735                // Evaluate the delimiter expression (second arg)
2736                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2737
2738                // Get the UNNEST expander
2739                let expander = expander_registry
2740                    .get("UNNEST")
2741                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2742
2743                // Expand the value
2744                let expansion = expander.expand(&column_value, &[delimiter_value])?;
2745                return Ok(Some(expansion));
2746            }
2747        }
2748
2749        Ok(None)
2750    }
2751
2752    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
2753    fn apply_aggregate_select(
2754        &self,
2755        view: DataView,
2756        select_items: &[SelectItem],
2757    ) -> Result<DataView> {
2758        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2759
2760        let source_table = view.source();
2761        let mut result_table = DataTable::new("aggregate_result");
2762
2763        // Add columns for each select item
2764        for item in select_items {
2765            let column_name = match item {
2766                SelectItem::Expression { alias, .. } => alias.clone(),
2767                _ => unreachable!("Should only have expressions in aggregate-only query"),
2768            };
2769            result_table.add_column(DataColumn::new(&column_name));
2770        }
2771
2772        // Create evaluator with visible rows from the view (for filtered aggregates)
2773        let visible_rows = view.visible_row_indices().to_vec();
2774        let mut evaluator =
2775            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2776                .with_visible_rows(visible_rows);
2777
2778        // Evaluate each aggregate expression once (they handle all rows internally)
2779        let mut row_values = Vec::new();
2780        for item in select_items {
2781            match item {
2782                SelectItem::Expression { expr, .. } => {
2783                    // The evaluator will handle aggregates over all rows
2784                    // We pass row_index=0 but aggregates ignore it and process all rows
2785                    let value = evaluator.evaluate(expr, 0)?;
2786                    row_values.push(value);
2787                }
2788                _ => unreachable!("Should only have expressions in aggregate-only query"),
2789            }
2790        }
2791
2792        // Add the single result row
2793        result_table
2794            .add_row(DataRow::new(row_values))
2795            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2796
2797        Ok(DataView::new(Arc::new(result_table)))
2798    }
2799
2800    /// Check if a column belongs to a specific table based on source_table or qualified_name
2801    ///
2802    /// This is used for table-scoped star expansion (e.g., `SELECT user.*`)
2803    /// to filter which columns should be included.
2804    ///
2805    /// # Arguments
2806    /// * `col` - The column to check
2807    /// * `table_name` - The table name or alias to match against
2808    ///
2809    /// # Returns
2810    /// `true` if the column belongs to the specified table
2811    fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2812        // First, check the source_table field
2813        if let Some(ref source) = col.source_table {
2814            // Direct match or matches with schema qualification
2815            if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2816                return true;
2817            }
2818        }
2819
2820        // Second, check the qualified_name field
2821        if let Some(ref qualified) = col.qualified_name {
2822            // Check if qualified name starts with "table_name."
2823            if qualified.starts_with(&format!("{}.", table_name)) {
2824                return true;
2825            }
2826        }
2827
2828        false
2829    }
2830
2831    /// Resolve `SelectItem` columns to indices (for simple column projections only)
2832    fn resolve_select_columns(
2833        &self,
2834        table: &DataTable,
2835        select_items: &[SelectItem],
2836    ) -> Result<Vec<usize>> {
2837        let mut indices = Vec::new();
2838        let table_columns = table.column_names();
2839
2840        for item in select_items {
2841            match item {
2842                SelectItem::Column {
2843                    column: col_ref, ..
2844                } => {
2845                    // Check if this has a table prefix
2846                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
2847                        // For qualified references, ONLY try qualified lookup - no fallback
2848                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2849                        table.find_column_by_qualified_name(&qualified_name)
2850                            .ok_or_else(|| {
2851                                // Check if any columns have qualified names for better error message
2852                                let has_qualified = table.columns.iter()
2853                                    .any(|c| c.qualified_name.is_some());
2854                                if !has_qualified {
2855                                    anyhow::anyhow!(
2856                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2857                                        qualified_name, table_prefix
2858                                    )
2859                                } else {
2860                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
2861                                }
2862                            })?
2863                    } else {
2864                        // Simple column name lookup
2865                        table_columns
2866                            .iter()
2867                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2868                            .ok_or_else(|| {
2869                                let suggestion = self.find_similar_column(table, &col_ref.name);
2870                                match suggestion {
2871                                    Some(similar) => anyhow::anyhow!(
2872                                        "Column '{}' not found. Did you mean '{}'?",
2873                                        col_ref.name,
2874                                        similar
2875                                    ),
2876                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2877                                }
2878                            })?
2879                    };
2880                    indices.push(index);
2881                }
2882                SelectItem::Star { table_prefix, .. } => {
2883                    if let Some(prefix) = table_prefix {
2884                        // Scoped expansion: table.* expands only columns from that table
2885                        for (i, col) in table.columns.iter().enumerate() {
2886                            if Self::column_matches_table(col, prefix) {
2887                                indices.push(i);
2888                            }
2889                        }
2890                    } else {
2891                        // Unscoped expansion: * expands to all column indices
2892                        for i in 0..table_columns.len() {
2893                            indices.push(i);
2894                        }
2895                    }
2896                }
2897                SelectItem::StarExclude {
2898                    table_prefix,
2899                    excluded_columns,
2900                    ..
2901                } => {
2902                    // Expand all columns (with optional table prefix), then exclude specified ones
2903                    if let Some(prefix) = table_prefix {
2904                        // Scoped expansion: table.* EXCLUDE expands only columns from that table
2905                        for (i, col) in table.columns.iter().enumerate() {
2906                            if Self::column_matches_table(col, prefix)
2907                                && !excluded_columns.contains(&col.name)
2908                            {
2909                                indices.push(i);
2910                            }
2911                        }
2912                    } else {
2913                        // Unscoped expansion: * EXCLUDE expands to all columns except excluded ones
2914                        for (i, col_name) in table_columns.iter().enumerate() {
2915                            if !excluded_columns
2916                                .iter()
2917                                .any(|exc| exc.eq_ignore_ascii_case(col_name))
2918                            {
2919                                indices.push(i);
2920                            }
2921                        }
2922                    }
2923                }
2924                SelectItem::Expression { .. } => {
2925                    return Err(anyhow::anyhow!(
2926                        "Computed expressions require new table creation"
2927                    ));
2928                }
2929            }
2930        }
2931
2932        Ok(indices)
2933    }
2934
2935    /// Apply DISTINCT to remove duplicate rows
2936    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2937        use std::collections::HashSet;
2938
2939        let source = view.source();
2940        let visible_cols = view.visible_column_indices();
2941        let visible_rows = view.visible_row_indices();
2942
2943        // Build a set to track unique rows
2944        let mut seen_rows = HashSet::new();
2945        let mut unique_row_indices = Vec::new();
2946
2947        for &row_idx in visible_rows {
2948            // Build a key representing this row's visible column values
2949            let mut row_key = Vec::new();
2950            for &col_idx in visible_cols {
2951                let value = source
2952                    .get_value(row_idx, col_idx)
2953                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2954                // Convert value to a hashable representation
2955                row_key.push(format!("{:?}", value));
2956            }
2957
2958            // Check if we've seen this row before
2959            if seen_rows.insert(row_key) {
2960                // First time seeing this row combination
2961                unique_row_indices.push(row_idx);
2962            }
2963        }
2964
2965        // Create a new view with only unique rows
2966        Ok(view.with_rows(unique_row_indices))
2967    }
2968
2969    /// Apply multi-column ORDER BY sorting to the view
2970    fn apply_multi_order_by(
2971        &self,
2972        view: DataView,
2973        order_by_columns: &[OrderByItem],
2974    ) -> Result<DataView> {
2975        self.apply_multi_order_by_with_context(view, order_by_columns, None)
2976    }
2977
2978    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
2979    fn apply_multi_order_by_with_context(
2980        &self,
2981        mut view: DataView,
2982        order_by_columns: &[OrderByItem],
2983        _exec_context: Option<&ExecutionContext>,
2984    ) -> Result<DataView> {
2985        // Build list of (source_column_index, ascending) tuples
2986        let mut sort_columns = Vec::new();
2987
2988        for order_col in order_by_columns {
2989            // Extract column name from expression (currently only supports simple columns)
2990            let column_name = match &order_col.expr {
2991                SqlExpression::Column(col_ref) => col_ref.name.clone(),
2992                _ => {
2993                    // TODO: Support expression evaluation in ORDER BY
2994                    return Err(anyhow!(
2995                        "ORDER BY expressions not yet supported - only simple columns allowed"
2996                    ));
2997                }
2998            };
2999
3000            // Try to find the column index, handling qualified column names (table.column)
3001            let col_index = if column_name.contains('.') {
3002                // Qualified column name - extract unqualified part
3003                if let Some(dot_pos) = column_name.rfind('.') {
3004                    let col_name = &column_name[dot_pos + 1..];
3005
3006                    // After SELECT processing, columns are unqualified
3007                    // So just use the column name part
3008                    debug!(
3009                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
3010                        col_name, column_name
3011                    );
3012                    view.source().get_column_index(col_name)
3013                } else {
3014                    view.source().get_column_index(&column_name)
3015                }
3016            } else {
3017                // Simple column name
3018                view.source().get_column_index(&column_name)
3019            }
3020            .ok_or_else(|| {
3021                // If not found, provide helpful error with suggestions
3022                let suggestion = self.find_similar_column(view.source(), &column_name);
3023                match suggestion {
3024                    Some(similar) => anyhow::anyhow!(
3025                        "Column '{}' not found. Did you mean '{}'?",
3026                        column_name,
3027                        similar
3028                    ),
3029                    None => {
3030                        // Also list available columns for debugging
3031                        let available_cols = view.source().column_names().join(", ");
3032                        anyhow::anyhow!(
3033                            "Column '{}' not found. Available columns: {}",
3034                            column_name,
3035                            available_cols
3036                        )
3037                    }
3038                }
3039            })?;
3040
3041            let ascending = matches!(order_col.direction, SortDirection::Asc);
3042            sort_columns.push((col_index, ascending));
3043        }
3044
3045        // Apply multi-column sorting
3046        view.apply_multi_sort(&sort_columns)?;
3047        Ok(view)
3048    }
3049
3050    /// Apply GROUP BY to the view with optional HAVING clause
3051    fn apply_group_by(
3052        &self,
3053        view: DataView,
3054        group_by_exprs: &[SqlExpression],
3055        select_items: &[SelectItem],
3056        having: Option<&SqlExpression>,
3057        plan: &mut ExecutionPlanBuilder,
3058    ) -> Result<DataView> {
3059        // Use the new expression-based GROUP BY implementation
3060        let (result_view, phase_info) = self.apply_group_by_expressions(
3061            view,
3062            group_by_exprs,
3063            select_items,
3064            having,
3065            self.case_insensitive,
3066            self.date_notation.clone(),
3067        )?;
3068
3069        // Add detailed phase information to the execution plan
3070        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
3071        plan.add_detail(format!(
3072            "Phase 1 - Group Building: {:.3}ms",
3073            phase_info.phase2_key_building.as_secs_f64() * 1000.0
3074        ));
3075        plan.add_detail(format!(
3076            "  • Processing {} rows into {} groups",
3077            phase_info.total_rows, phase_info.num_groups
3078        ));
3079        plan.add_detail(format!(
3080            "Phase 2 - Aggregation: {:.3}ms",
3081            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
3082        ));
3083        if phase_info.phase4_having_evaluation > Duration::ZERO {
3084            plan.add_detail(format!(
3085                "Phase 3 - HAVING Filter: {:.3}ms",
3086                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
3087            ));
3088            plan.add_detail(format!(
3089                "  • Filtered {} groups",
3090                phase_info.groups_filtered_by_having
3091            ));
3092        }
3093        plan.add_detail(format!(
3094            "Total GROUP BY time: {:.3}ms",
3095            phase_info.total_time.as_secs_f64() * 1000.0
3096        ));
3097
3098        Ok(result_view)
3099    }
3100
3101    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
3102    /// This helps pre-size hash tables for better performance
3103    pub fn estimate_group_cardinality(
3104        &self,
3105        view: &DataView,
3106        group_by_exprs: &[SqlExpression],
3107    ) -> usize {
3108        // If we have few rows, just return the row count as upper bound
3109        let row_count = view.get_visible_rows().len();
3110        if row_count <= 100 {
3111            return row_count;
3112        }
3113
3114        // Sample first 1000 rows or 10% of data, whichever is smaller
3115        let sample_size = min(1000, row_count / 10).max(100);
3116        let mut seen = FxHashSet::default();
3117
3118        let visible_rows = view.get_visible_rows();
3119        for (i, &row_idx) in visible_rows.iter().enumerate() {
3120            if i >= sample_size {
3121                break;
3122            }
3123
3124            // Evaluate GROUP BY expressions for this row
3125            let mut key_values = Vec::new();
3126            for expr in group_by_exprs {
3127                let mut evaluator = ArithmeticEvaluator::new(view.source());
3128                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
3129                key_values.push(value);
3130            }
3131
3132            seen.insert(key_values);
3133        }
3134
3135        // Estimate total cardinality based on sample
3136        let sample_cardinality = seen.len();
3137        let estimated = (sample_cardinality * row_count) / sample_size;
3138
3139        // Cap at row count and ensure minimum of sample cardinality
3140        estimated.min(row_count).max(sample_cardinality)
3141    }
3142}
3143
3144#[cfg(test)]
3145mod tests {
3146    use super::*;
3147    use crate::data::datatable::{DataColumn, DataRow, DataValue};
3148
3149    fn create_test_table() -> Arc<DataTable> {
3150        let mut table = DataTable::new("test");
3151
3152        // Add columns
3153        table.add_column(DataColumn::new("id"));
3154        table.add_column(DataColumn::new("name"));
3155        table.add_column(DataColumn::new("age"));
3156
3157        // Add rows
3158        table
3159            .add_row(DataRow::new(vec![
3160                DataValue::Integer(1),
3161                DataValue::String("Alice".to_string()),
3162                DataValue::Integer(30),
3163            ]))
3164            .unwrap();
3165
3166        table
3167            .add_row(DataRow::new(vec![
3168                DataValue::Integer(2),
3169                DataValue::String("Bob".to_string()),
3170                DataValue::Integer(25),
3171            ]))
3172            .unwrap();
3173
3174        table
3175            .add_row(DataRow::new(vec![
3176                DataValue::Integer(3),
3177                DataValue::String("Charlie".to_string()),
3178                DataValue::Integer(35),
3179            ]))
3180            .unwrap();
3181
3182        Arc::new(table)
3183    }
3184
3185    #[test]
3186    fn test_select_all() {
3187        let table = create_test_table();
3188        let engine = QueryEngine::new();
3189
3190        let view = engine
3191            .execute(table.clone(), "SELECT * FROM users")
3192            .unwrap();
3193        assert_eq!(view.row_count(), 3);
3194        assert_eq!(view.column_count(), 3);
3195    }
3196
3197    #[test]
3198    fn test_select_columns() {
3199        let table = create_test_table();
3200        let engine = QueryEngine::new();
3201
3202        let view = engine
3203            .execute(table.clone(), "SELECT name, age FROM users")
3204            .unwrap();
3205        assert_eq!(view.row_count(), 3);
3206        assert_eq!(view.column_count(), 2);
3207    }
3208
3209    #[test]
3210    fn test_select_with_limit() {
3211        let table = create_test_table();
3212        let engine = QueryEngine::new();
3213
3214        let view = engine
3215            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3216            .unwrap();
3217        assert_eq!(view.row_count(), 2);
3218    }
3219
3220    #[test]
3221    fn test_type_coercion_contains() {
3222        // Initialize tracing for debug output
3223        let _ = tracing_subscriber::fmt()
3224            .with_max_level(tracing::Level::DEBUG)
3225            .try_init();
3226
3227        let mut table = DataTable::new("test");
3228        table.add_column(DataColumn::new("id"));
3229        table.add_column(DataColumn::new("status"));
3230        table.add_column(DataColumn::new("price"));
3231
3232        // Add test data with mixed types
3233        table
3234            .add_row(DataRow::new(vec![
3235                DataValue::Integer(1),
3236                DataValue::String("Pending".to_string()),
3237                DataValue::Float(99.99),
3238            ]))
3239            .unwrap();
3240
3241        table
3242            .add_row(DataRow::new(vec![
3243                DataValue::Integer(2),
3244                DataValue::String("Confirmed".to_string()),
3245                DataValue::Float(150.50),
3246            ]))
3247            .unwrap();
3248
3249        table
3250            .add_row(DataRow::new(vec![
3251                DataValue::Integer(3),
3252                DataValue::String("Pending".to_string()),
3253                DataValue::Float(75.00),
3254            ]))
3255            .unwrap();
3256
3257        let table = Arc::new(table);
3258        let engine = QueryEngine::new();
3259
3260        println!("\n=== Testing WHERE clause with Contains ===");
3261        println!("Table has {} rows", table.row_count());
3262        for i in 0..table.row_count() {
3263            let status = table.get_value(i, 1);
3264            println!("Row {i}: status = {status:?}");
3265        }
3266
3267        // Test 1: Basic string contains (should work)
3268        println!("\n--- Test 1: status.Contains('pend') ---");
3269        let result = engine.execute(
3270            table.clone(),
3271            "SELECT * FROM test WHERE status.Contains('pend')",
3272        );
3273        match result {
3274            Ok(view) => {
3275                println!("SUCCESS: Found {} matching rows", view.row_count());
3276                assert_eq!(view.row_count(), 2); // Should find both Pending rows
3277            }
3278            Err(e) => {
3279                panic!("Query failed: {e}");
3280            }
3281        }
3282
3283        // Test 2: Numeric contains (should work with type coercion)
3284        println!("\n--- Test 2: price.Contains('9') ---");
3285        let result = engine.execute(
3286            table.clone(),
3287            "SELECT * FROM test WHERE price.Contains('9')",
3288        );
3289        match result {
3290            Ok(view) => {
3291                println!(
3292                    "SUCCESS: Found {} matching rows with price containing '9'",
3293                    view.row_count()
3294                );
3295                // Should find 99.99 row
3296                assert!(view.row_count() >= 1);
3297            }
3298            Err(e) => {
3299                panic!("Numeric coercion query failed: {e}");
3300            }
3301        }
3302
3303        println!("\n=== All tests passed! ===");
3304    }
3305
3306    #[test]
3307    fn test_not_in_clause() {
3308        // Initialize tracing for debug output
3309        let _ = tracing_subscriber::fmt()
3310            .with_max_level(tracing::Level::DEBUG)
3311            .try_init();
3312
3313        let mut table = DataTable::new("test");
3314        table.add_column(DataColumn::new("id"));
3315        table.add_column(DataColumn::new("country"));
3316
3317        // Add test data
3318        table
3319            .add_row(DataRow::new(vec![
3320                DataValue::Integer(1),
3321                DataValue::String("CA".to_string()),
3322            ]))
3323            .unwrap();
3324
3325        table
3326            .add_row(DataRow::new(vec![
3327                DataValue::Integer(2),
3328                DataValue::String("US".to_string()),
3329            ]))
3330            .unwrap();
3331
3332        table
3333            .add_row(DataRow::new(vec![
3334                DataValue::Integer(3),
3335                DataValue::String("UK".to_string()),
3336            ]))
3337            .unwrap();
3338
3339        let table = Arc::new(table);
3340        let engine = QueryEngine::new();
3341
3342        println!("\n=== Testing NOT IN clause ===");
3343        println!("Table has {} rows", table.row_count());
3344        for i in 0..table.row_count() {
3345            let country = table.get_value(i, 1);
3346            println!("Row {i}: country = {country:?}");
3347        }
3348
3349        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
3350        println!("\n--- Test: country NOT IN ('CA') ---");
3351        let result = engine.execute(
3352            table.clone(),
3353            "SELECT * FROM test WHERE country NOT IN ('CA')",
3354        );
3355        match result {
3356            Ok(view) => {
3357                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3358                assert_eq!(view.row_count(), 2); // Should find US and UK
3359            }
3360            Err(e) => {
3361                panic!("NOT IN query failed: {e}");
3362            }
3363        }
3364
3365        println!("\n=== NOT IN test complete! ===");
3366    }
3367
3368    #[test]
3369    fn test_case_insensitive_in_and_not_in() {
3370        // Initialize tracing for debug output
3371        let _ = tracing_subscriber::fmt()
3372            .with_max_level(tracing::Level::DEBUG)
3373            .try_init();
3374
3375        let mut table = DataTable::new("test");
3376        table.add_column(DataColumn::new("id"));
3377        table.add_column(DataColumn::new("country"));
3378
3379        // Add test data with mixed case
3380        table
3381            .add_row(DataRow::new(vec![
3382                DataValue::Integer(1),
3383                DataValue::String("CA".to_string()), // uppercase
3384            ]))
3385            .unwrap();
3386
3387        table
3388            .add_row(DataRow::new(vec![
3389                DataValue::Integer(2),
3390                DataValue::String("us".to_string()), // lowercase
3391            ]))
3392            .unwrap();
3393
3394        table
3395            .add_row(DataRow::new(vec![
3396                DataValue::Integer(3),
3397                DataValue::String("UK".to_string()), // uppercase
3398            ]))
3399            .unwrap();
3400
3401        let table = Arc::new(table);
3402
3403        println!("\n=== Testing Case-Insensitive IN clause ===");
3404        println!("Table has {} rows", table.row_count());
3405        for i in 0..table.row_count() {
3406            let country = table.get_value(i, 1);
3407            println!("Row {i}: country = {country:?}");
3408        }
3409
3410        // Test case-insensitive IN - should match 'CA' with 'ca'
3411        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3412        let engine = QueryEngine::with_case_insensitive(true);
3413        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3414        match result {
3415            Ok(view) => {
3416                println!(
3417                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3418                    view.row_count()
3419                );
3420                assert_eq!(view.row_count(), 1); // Should find CA row
3421            }
3422            Err(e) => {
3423                panic!("Case-insensitive IN query failed: {e}");
3424            }
3425        }
3426
3427        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
3428        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3429        let result = engine.execute(
3430            table.clone(),
3431            "SELECT * FROM test WHERE country NOT IN ('ca')",
3432        );
3433        match result {
3434            Ok(view) => {
3435                println!(
3436                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3437                    view.row_count()
3438                );
3439                assert_eq!(view.row_count(), 2); // Should find us and UK rows
3440            }
3441            Err(e) => {
3442                panic!("Case-insensitive NOT IN query failed: {e}");
3443            }
3444        }
3445
3446        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
3447        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3448        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
3449        let result = engine_case_sensitive
3450            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3451        match result {
3452            Ok(view) => {
3453                println!(
3454                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3455                    view.row_count()
3456                );
3457                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
3458            }
3459            Err(e) => {
3460                panic!("Case-sensitive IN query failed: {e}");
3461            }
3462        }
3463
3464        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3465    }
3466
3467    #[test]
3468    #[ignore = "Parentheses in WHERE clause not yet implemented"]
3469    fn test_parentheses_in_where_clause() {
3470        // Initialize tracing for debug output
3471        let _ = tracing_subscriber::fmt()
3472            .with_max_level(tracing::Level::DEBUG)
3473            .try_init();
3474
3475        let mut table = DataTable::new("test");
3476        table.add_column(DataColumn::new("id"));
3477        table.add_column(DataColumn::new("status"));
3478        table.add_column(DataColumn::new("priority"));
3479
3480        // Add test data
3481        table
3482            .add_row(DataRow::new(vec![
3483                DataValue::Integer(1),
3484                DataValue::String("Pending".to_string()),
3485                DataValue::String("High".to_string()),
3486            ]))
3487            .unwrap();
3488
3489        table
3490            .add_row(DataRow::new(vec![
3491                DataValue::Integer(2),
3492                DataValue::String("Complete".to_string()),
3493                DataValue::String("High".to_string()),
3494            ]))
3495            .unwrap();
3496
3497        table
3498            .add_row(DataRow::new(vec![
3499                DataValue::Integer(3),
3500                DataValue::String("Pending".to_string()),
3501                DataValue::String("Low".to_string()),
3502            ]))
3503            .unwrap();
3504
3505        table
3506            .add_row(DataRow::new(vec![
3507                DataValue::Integer(4),
3508                DataValue::String("Complete".to_string()),
3509                DataValue::String("Low".to_string()),
3510            ]))
3511            .unwrap();
3512
3513        let table = Arc::new(table);
3514        let engine = QueryEngine::new();
3515
3516        println!("\n=== Testing Parentheses in WHERE clause ===");
3517        println!("Table has {} rows", table.row_count());
3518        for i in 0..table.row_count() {
3519            let status = table.get_value(i, 1);
3520            let priority = table.get_value(i, 2);
3521            println!("Row {i}: status = {status:?}, priority = {priority:?}");
3522        }
3523
3524        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
3525        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3526        let result = engine.execute(
3527            table.clone(),
3528            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3529        );
3530        match result {
3531            Ok(view) => {
3532                println!(
3533                    "SUCCESS: Found {} rows with parenthetical logic",
3534                    view.row_count()
3535                );
3536                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
3537            }
3538            Err(e) => {
3539                panic!("Parentheses query failed: {e}");
3540            }
3541        }
3542
3543        println!("\n=== Parentheses test complete! ===");
3544    }
3545
3546    #[test]
3547    #[ignore = "Numeric type coercion needs fixing"]
3548    fn test_numeric_type_coercion() {
3549        // Initialize tracing for debug output
3550        let _ = tracing_subscriber::fmt()
3551            .with_max_level(tracing::Level::DEBUG)
3552            .try_init();
3553
3554        let mut table = DataTable::new("test");
3555        table.add_column(DataColumn::new("id"));
3556        table.add_column(DataColumn::new("price"));
3557        table.add_column(DataColumn::new("quantity"));
3558
3559        // Add test data with different numeric types
3560        table
3561            .add_row(DataRow::new(vec![
3562                DataValue::Integer(1),
3563                DataValue::Float(99.50), // Contains '.'
3564                DataValue::Integer(100),
3565            ]))
3566            .unwrap();
3567
3568        table
3569            .add_row(DataRow::new(vec![
3570                DataValue::Integer(2),
3571                DataValue::Float(150.0), // Contains '.' and '0'
3572                DataValue::Integer(200),
3573            ]))
3574            .unwrap();
3575
3576        table
3577            .add_row(DataRow::new(vec![
3578                DataValue::Integer(3),
3579                DataValue::Integer(75), // No decimal point
3580                DataValue::Integer(50),
3581            ]))
3582            .unwrap();
3583
3584        let table = Arc::new(table);
3585        let engine = QueryEngine::new();
3586
3587        println!("\n=== Testing Numeric Type Coercion ===");
3588        println!("Table has {} rows", table.row_count());
3589        for i in 0..table.row_count() {
3590            let price = table.get_value(i, 1);
3591            let quantity = table.get_value(i, 2);
3592            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3593        }
3594
3595        // Test Contains on float values - should find rows with decimal points
3596        println!("\n--- Test: price.Contains('.') ---");
3597        let result = engine.execute(
3598            table.clone(),
3599            "SELECT * FROM test WHERE price.Contains('.')",
3600        );
3601        match result {
3602            Ok(view) => {
3603                println!(
3604                    "SUCCESS: Found {} rows with decimal points in price",
3605                    view.row_count()
3606                );
3607                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
3608            }
3609            Err(e) => {
3610                panic!("Numeric Contains query failed: {e}");
3611            }
3612        }
3613
3614        // Test Contains on integer values converted to string
3615        println!("\n--- Test: quantity.Contains('0') ---");
3616        let result = engine.execute(
3617            table.clone(),
3618            "SELECT * FROM test WHERE quantity.Contains('0')",
3619        );
3620        match result {
3621            Ok(view) => {
3622                println!(
3623                    "SUCCESS: Found {} rows with '0' in quantity",
3624                    view.row_count()
3625                );
3626                assert_eq!(view.row_count(), 2); // Should find 100 and 200
3627            }
3628            Err(e) => {
3629                panic!("Integer Contains query failed: {e}");
3630            }
3631        }
3632
3633        println!("\n=== Numeric type coercion test complete! ===");
3634    }
3635
3636    #[test]
3637    fn test_datetime_comparisons() {
3638        // Initialize tracing for debug output
3639        let _ = tracing_subscriber::fmt()
3640            .with_max_level(tracing::Level::DEBUG)
3641            .try_init();
3642
3643        let mut table = DataTable::new("test");
3644        table.add_column(DataColumn::new("id"));
3645        table.add_column(DataColumn::new("created_date"));
3646
3647        // Add test data with date strings (as they would come from CSV)
3648        table
3649            .add_row(DataRow::new(vec![
3650                DataValue::Integer(1),
3651                DataValue::String("2024-12-15".to_string()),
3652            ]))
3653            .unwrap();
3654
3655        table
3656            .add_row(DataRow::new(vec![
3657                DataValue::Integer(2),
3658                DataValue::String("2025-01-15".to_string()),
3659            ]))
3660            .unwrap();
3661
3662        table
3663            .add_row(DataRow::new(vec![
3664                DataValue::Integer(3),
3665                DataValue::String("2025-02-15".to_string()),
3666            ]))
3667            .unwrap();
3668
3669        let table = Arc::new(table);
3670        let engine = QueryEngine::new();
3671
3672        println!("\n=== Testing DateTime Comparisons ===");
3673        println!("Table has {} rows", table.row_count());
3674        for i in 0..table.row_count() {
3675            let date = table.get_value(i, 1);
3676            println!("Row {i}: created_date = {date:?}");
3677        }
3678
3679        // Test DateTime constructor comparison - should find dates after 2025-01-01
3680        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3681        let result = engine.execute(
3682            table.clone(),
3683            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3684        );
3685        match result {
3686            Ok(view) => {
3687                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3688                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
3689            }
3690            Err(e) => {
3691                panic!("DateTime comparison query failed: {e}");
3692            }
3693        }
3694
3695        println!("\n=== DateTime comparison test complete! ===");
3696    }
3697
3698    #[test]
3699    fn test_not_with_method_calls() {
3700        // Initialize tracing for debug output
3701        let _ = tracing_subscriber::fmt()
3702            .with_max_level(tracing::Level::DEBUG)
3703            .try_init();
3704
3705        let mut table = DataTable::new("test");
3706        table.add_column(DataColumn::new("id"));
3707        table.add_column(DataColumn::new("status"));
3708
3709        // Add test data
3710        table
3711            .add_row(DataRow::new(vec![
3712                DataValue::Integer(1),
3713                DataValue::String("Pending Review".to_string()),
3714            ]))
3715            .unwrap();
3716
3717        table
3718            .add_row(DataRow::new(vec![
3719                DataValue::Integer(2),
3720                DataValue::String("Complete".to_string()),
3721            ]))
3722            .unwrap();
3723
3724        table
3725            .add_row(DataRow::new(vec![
3726                DataValue::Integer(3),
3727                DataValue::String("Pending Approval".to_string()),
3728            ]))
3729            .unwrap();
3730
3731        let table = Arc::new(table);
3732        let engine = QueryEngine::with_case_insensitive(true);
3733
3734        println!("\n=== Testing NOT with Method Calls ===");
3735        println!("Table has {} rows", table.row_count());
3736        for i in 0..table.row_count() {
3737            let status = table.get_value(i, 1);
3738            println!("Row {i}: status = {status:?}");
3739        }
3740
3741        // Test NOT with Contains - should exclude rows containing "pend"
3742        println!("\n--- Test: NOT status.Contains('pend') ---");
3743        let result = engine.execute(
3744            table.clone(),
3745            "SELECT * FROM test WHERE NOT status.Contains('pend')",
3746        );
3747        match result {
3748            Ok(view) => {
3749                println!(
3750                    "SUCCESS: Found {} rows NOT containing 'pend'",
3751                    view.row_count()
3752                );
3753                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3754            }
3755            Err(e) => {
3756                panic!("NOT Contains query failed: {e}");
3757            }
3758        }
3759
3760        // Test NOT with StartsWith
3761        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3762        let result = engine.execute(
3763            table.clone(),
3764            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3765        );
3766        match result {
3767            Ok(view) => {
3768                println!(
3769                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
3770                    view.row_count()
3771                );
3772                assert_eq!(view.row_count(), 1); // Should find only "Complete"
3773            }
3774            Err(e) => {
3775                panic!("NOT StartsWith query failed: {e}");
3776            }
3777        }
3778
3779        println!("\n=== NOT with method calls test complete! ===");
3780    }
3781
3782    #[test]
3783    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3784    fn test_complex_logical_expressions() {
3785        // Initialize tracing for debug output
3786        let _ = tracing_subscriber::fmt()
3787            .with_max_level(tracing::Level::DEBUG)
3788            .try_init();
3789
3790        let mut table = DataTable::new("test");
3791        table.add_column(DataColumn::new("id"));
3792        table.add_column(DataColumn::new("status"));
3793        table.add_column(DataColumn::new("priority"));
3794        table.add_column(DataColumn::new("assigned"));
3795
3796        // Add comprehensive test data
3797        table
3798            .add_row(DataRow::new(vec![
3799                DataValue::Integer(1),
3800                DataValue::String("Pending".to_string()),
3801                DataValue::String("High".to_string()),
3802                DataValue::String("John".to_string()),
3803            ]))
3804            .unwrap();
3805
3806        table
3807            .add_row(DataRow::new(vec![
3808                DataValue::Integer(2),
3809                DataValue::String("Complete".to_string()),
3810                DataValue::String("High".to_string()),
3811                DataValue::String("Jane".to_string()),
3812            ]))
3813            .unwrap();
3814
3815        table
3816            .add_row(DataRow::new(vec![
3817                DataValue::Integer(3),
3818                DataValue::String("Pending".to_string()),
3819                DataValue::String("Low".to_string()),
3820                DataValue::String("John".to_string()),
3821            ]))
3822            .unwrap();
3823
3824        table
3825            .add_row(DataRow::new(vec![
3826                DataValue::Integer(4),
3827                DataValue::String("In Progress".to_string()),
3828                DataValue::String("Medium".to_string()),
3829                DataValue::String("Jane".to_string()),
3830            ]))
3831            .unwrap();
3832
3833        let table = Arc::new(table);
3834        let engine = QueryEngine::new();
3835
3836        println!("\n=== Testing Complex Logical Expressions ===");
3837        println!("Table has {} rows", table.row_count());
3838        for i in 0..table.row_count() {
3839            let status = table.get_value(i, 1);
3840            let priority = table.get_value(i, 2);
3841            let assigned = table.get_value(i, 3);
3842            println!(
3843                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3844            );
3845        }
3846
3847        // Test complex AND/OR logic
3848        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3849        let result = engine.execute(
3850            table.clone(),
3851            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3852        );
3853        match result {
3854            Ok(view) => {
3855                println!(
3856                    "SUCCESS: Found {} rows with complex logic",
3857                    view.row_count()
3858                );
3859                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
3860            }
3861            Err(e) => {
3862                panic!("Complex logic query failed: {e}");
3863            }
3864        }
3865
3866        // Test NOT with complex expressions
3867        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3868        let result = engine.execute(
3869            table.clone(),
3870            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3871        );
3872        match result {
3873            Ok(view) => {
3874                println!(
3875                    "SUCCESS: Found {} rows with NOT complex logic",
3876                    view.row_count()
3877                );
3878                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
3879            }
3880            Err(e) => {
3881                panic!("NOT complex logic query failed: {e}");
3882            }
3883        }
3884
3885        println!("\n=== Complex logical expressions test complete! ===");
3886    }
3887
3888    #[test]
3889    fn test_mixed_data_types_and_edge_cases() {
3890        // Initialize tracing for debug output
3891        let _ = tracing_subscriber::fmt()
3892            .with_max_level(tracing::Level::DEBUG)
3893            .try_init();
3894
3895        let mut table = DataTable::new("test");
3896        table.add_column(DataColumn::new("id"));
3897        table.add_column(DataColumn::new("value"));
3898        table.add_column(DataColumn::new("nullable_field"));
3899
3900        // Add test data with mixed types and edge cases
3901        table
3902            .add_row(DataRow::new(vec![
3903                DataValue::Integer(1),
3904                DataValue::String("123.45".to_string()),
3905                DataValue::String("present".to_string()),
3906            ]))
3907            .unwrap();
3908
3909        table
3910            .add_row(DataRow::new(vec![
3911                DataValue::Integer(2),
3912                DataValue::Float(678.90),
3913                DataValue::Null,
3914            ]))
3915            .unwrap();
3916
3917        table
3918            .add_row(DataRow::new(vec![
3919                DataValue::Integer(3),
3920                DataValue::Boolean(true),
3921                DataValue::String("also present".to_string()),
3922            ]))
3923            .unwrap();
3924
3925        table
3926            .add_row(DataRow::new(vec![
3927                DataValue::Integer(4),
3928                DataValue::String("false".to_string()),
3929                DataValue::Null,
3930            ]))
3931            .unwrap();
3932
3933        let table = Arc::new(table);
3934        let engine = QueryEngine::new();
3935
3936        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3937        println!("Table has {} rows", table.row_count());
3938        for i in 0..table.row_count() {
3939            let value = table.get_value(i, 1);
3940            let nullable = table.get_value(i, 2);
3941            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3942        }
3943
3944        // Test type coercion with boolean Contains
3945        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3946        let result = engine.execute(
3947            table.clone(),
3948            "SELECT * FROM test WHERE value.Contains('true')",
3949        );
3950        match result {
3951            Ok(view) => {
3952                println!(
3953                    "SUCCESS: Found {} rows with boolean coercion",
3954                    view.row_count()
3955                );
3956                assert_eq!(view.row_count(), 1); // Should find the boolean true row
3957            }
3958            Err(e) => {
3959                panic!("Boolean coercion query failed: {e}");
3960            }
3961        }
3962
3963        // Test multiple IN values with mixed types
3964        println!("\n--- Test: id IN (1, 3) ---");
3965        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3966        match result {
3967            Ok(view) => {
3968                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3969                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
3970            }
3971            Err(e) => {
3972                panic!("Multiple IN values query failed: {e}");
3973            }
3974        }
3975
3976        println!("\n=== Mixed data types test complete! ===");
3977    }
3978
3979    /// Test that aggregate-only queries return exactly one row (regression test)
3980    #[test]
3981    fn test_aggregate_only_single_row() {
3982        let table = create_test_stock_data();
3983        let engine = QueryEngine::new();
3984
3985        // Test query with multiple aggregates - should return exactly 1 row
3986        let result = engine
3987            .execute(
3988                table.clone(),
3989                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3990            )
3991            .expect("Query should succeed");
3992
3993        assert_eq!(
3994            result.row_count(),
3995            1,
3996            "Aggregate-only query should return exactly 1 row"
3997        );
3998        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3999
4000        // Verify the actual values are correct
4001        let source = result.source();
4002        let row = source.get_row(0).expect("Should have first row");
4003
4004        // COUNT(*) should be 5 (total rows)
4005        assert_eq!(row.values[0], DataValue::Integer(5));
4006
4007        // MIN should be 99.5
4008        assert_eq!(row.values[1], DataValue::Float(99.5));
4009
4010        // MAX should be 105.0
4011        assert_eq!(row.values[2], DataValue::Float(105.0));
4012
4013        // AVG should be approximately 102.4
4014        if let DataValue::Float(avg) = &row.values[3] {
4015            assert!(
4016                (avg - 102.4).abs() < 0.01,
4017                "Average should be approximately 102.4, got {}",
4018                avg
4019            );
4020        } else {
4021            panic!("AVG should return a Float value");
4022        }
4023    }
4024
4025    /// Test single aggregate function returns single row
4026    #[test]
4027    fn test_single_aggregate_single_row() {
4028        let table = create_test_stock_data();
4029        let engine = QueryEngine::new();
4030
4031        let result = engine
4032            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
4033            .expect("Query should succeed");
4034
4035        assert_eq!(
4036            result.row_count(),
4037            1,
4038            "Single aggregate query should return exactly 1 row"
4039        );
4040        assert_eq!(result.column_count(), 1, "Should have 1 column");
4041
4042        let source = result.source();
4043        let row = source.get_row(0).expect("Should have first row");
4044        assert_eq!(row.values[0], DataValue::Integer(5));
4045    }
4046
4047    /// Test aggregate with WHERE clause filtering
4048    #[test]
4049    fn test_aggregate_with_where_single_row() {
4050        let table = create_test_stock_data();
4051        let engine = QueryEngine::new();
4052
4053        // Filter to only high-value stocks (>= 103.0) and aggregate
4054        let result = engine
4055            .execute(
4056                table.clone(),
4057                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
4058            )
4059            .expect("Query should succeed");
4060
4061        assert_eq!(
4062            result.row_count(),
4063            1,
4064            "Filtered aggregate query should return exactly 1 row"
4065        );
4066        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
4067
4068        let source = result.source();
4069        let row = source.get_row(0).expect("Should have first row");
4070
4071        // Should find 2 rows (103.5 and 105.0)
4072        assert_eq!(row.values[0], DataValue::Integer(2));
4073        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
4074        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
4075    }
4076
4077    #[test]
4078    fn test_not_in_parsing() {
4079        use crate::sql::recursive_parser::Parser;
4080
4081        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
4082        println!("\n=== Testing NOT IN parsing ===");
4083        println!("Parsing query: {query}");
4084
4085        let mut parser = Parser::new(query);
4086        match parser.parse() {
4087            Ok(statement) => {
4088                println!("Parsed statement: {statement:#?}");
4089                if let Some(where_clause) = statement.where_clause {
4090                    println!("WHERE conditions: {:#?}", where_clause.conditions);
4091                    if let Some(first_condition) = where_clause.conditions.first() {
4092                        println!("First condition expression: {:#?}", first_condition.expr);
4093                    }
4094                }
4095            }
4096            Err(e) => {
4097                panic!("Parse error: {e}");
4098            }
4099        }
4100    }
4101
4102    /// Create test stock data for aggregate testing
4103    fn create_test_stock_data() -> Arc<DataTable> {
4104        let mut table = DataTable::new("stock");
4105
4106        table.add_column(DataColumn::new("symbol"));
4107        table.add_column(DataColumn::new("close"));
4108        table.add_column(DataColumn::new("volume"));
4109
4110        // Add 5 rows of test data
4111        let test_data = vec![
4112            ("AAPL", 99.5, 1000),
4113            ("AAPL", 101.2, 1500),
4114            ("AAPL", 103.5, 2000),
4115            ("AAPL", 105.0, 1200),
4116            ("AAPL", 102.8, 1800),
4117        ];
4118
4119        for (symbol, close, volume) in test_data {
4120            table
4121                .add_row(DataRow::new(vec![
4122                    DataValue::String(symbol.to_string()),
4123                    DataValue::Float(close),
4124                    DataValue::Integer(volume),
4125                ]))
4126                .expect("Should add row successfully");
4127        }
4128
4129        Arc::new(table)
4130    }
4131}
4132
4133#[cfg(test)]
4134#[path = "query_engine_tests.rs"]
4135mod query_engine_tests;