sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::TableSource;
25use crate::sql::recursive_parser::{
26    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
27    TableFunction,
28};
29
30/// Execution context for tracking table aliases and scope during query execution
31#[derive(Debug, Clone)]
32pub struct ExecutionContext {
33    /// Map from alias to actual table/CTE name
34    /// Example: "t" -> "#tmp_trades", "a" -> "data"
35    alias_map: HashMap<String, String>,
36}
37
38impl ExecutionContext {
39    /// Create a new empty execution context
40    pub fn new() -> Self {
41        Self {
42            alias_map: HashMap::new(),
43        }
44    }
45
46    /// Register a table alias
47    pub fn register_alias(&mut self, alias: String, table_name: String) {
48        debug!("Registering alias: {} -> {}", alias, table_name);
49        self.alias_map.insert(alias, table_name);
50    }
51
52    /// Resolve an alias to its actual table name
53    /// Returns the alias itself if not found in the map
54    pub fn resolve_alias(&self, name: &str) -> String {
55        self.alias_map
56            .get(name)
57            .cloned()
58            .unwrap_or_else(|| name.to_string())
59    }
60
61    /// Check if a name is a registered alias
62    pub fn is_alias(&self, name: &str) -> bool {
63        self.alias_map.contains_key(name)
64    }
65
66    /// Get a copy of all registered aliases
67    pub fn get_aliases(&self) -> HashMap<String, String> {
68        self.alias_map.clone()
69    }
70
71    /// Resolve a column reference to its index in the table, handling aliases
72    ///
73    /// This is the unified column resolution function that should be used by all
74    /// SQL clauses (WHERE, SELECT, ORDER BY, GROUP BY) to ensure consistent
75    /// alias resolution behavior.
76    ///
77    /// Resolution strategy:
78    /// 1. If column_ref has a table_prefix (e.g., "t" in "t.amount"):
79    ///    a. Resolve the alias: t -> actual_table_name
80    ///    b. Try qualified lookup: "actual_table_name.amount"
81    ///    c. Fall back to unqualified: "amount"
82    /// 2. If column_ref has no prefix:
83    ///    a. Try simple column name lookup: "amount"
84    ///    b. Try as qualified name if it contains a dot: "table.column"
85    pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
86        if let Some(table_prefix) = &column_ref.table_prefix {
87            // Qualified column reference: resolve the alias first
88            let actual_table = self.resolve_alias(table_prefix);
89
90            // Try qualified lookup: "actual_table.column"
91            let qualified_name = format!("{}.{}", actual_table, column_ref.name);
92            if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
93                debug!(
94                    "Resolved {}.{} -> qualified column '{}' at index {}",
95                    table_prefix, column_ref.name, qualified_name, idx
96                );
97                return Ok(idx);
98            }
99
100            // Fall back to unqualified lookup
101            if let Some(idx) = table.get_column_index(&column_ref.name) {
102                debug!(
103                    "Resolved {}.{} -> unqualified column '{}' at index {}",
104                    table_prefix, column_ref.name, column_ref.name, idx
105                );
106                return Ok(idx);
107            }
108
109            // Not found with either qualified or unqualified name
110            Err(anyhow!(
111                "Column '{}' not found. Table '{}' may not support qualified column names",
112                qualified_name,
113                actual_table
114            ))
115        } else {
116            // Unqualified column reference
117            if let Some(idx) = table.get_column_index(&column_ref.name) {
118                debug!(
119                    "Resolved unqualified column '{}' at index {}",
120                    column_ref.name, idx
121                );
122                return Ok(idx);
123            }
124
125            // If the column name contains a dot, try it as a qualified name
126            if column_ref.name.contains('.') {
127                if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
128                    debug!(
129                        "Resolved '{}' as qualified column at index {}",
130                        column_ref.name, idx
131                    );
132                    return Ok(idx);
133                }
134            }
135
136            // Column not found - provide helpful error
137            let suggestion = self.find_similar_column(table, &column_ref.name);
138            match suggestion {
139                Some(similar) => Err(anyhow!(
140                    "Column '{}' not found. Did you mean '{}'?",
141                    column_ref.name,
142                    similar
143                )),
144                None => Err(anyhow!("Column '{}' not found", column_ref.name)),
145            }
146        }
147    }
148
149    /// Find a similar column name using edit distance (for better error messages)
150    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
151        let columns = table.column_names();
152        let mut best_match: Option<(String, usize)> = None;
153
154        for col in columns {
155            let distance = edit_distance(name, &col);
156            if distance <= 2 {
157                // Allow up to 2 character differences
158                match best_match {
159                    Some((_, best_dist)) if distance < best_dist => {
160                        best_match = Some((col.clone(), distance));
161                    }
162                    None => {
163                        best_match = Some((col.clone(), distance));
164                    }
165                    _ => {}
166                }
167            }
168        }
169
170        best_match.map(|(name, _)| name)
171    }
172}
173
174impl Default for ExecutionContext {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180/// Calculate edit distance between two strings (Levenshtein distance)
181fn edit_distance(a: &str, b: &str) -> usize {
182    let len_a = a.chars().count();
183    let len_b = b.chars().count();
184
185    if len_a == 0 {
186        return len_b;
187    }
188    if len_b == 0 {
189        return len_a;
190    }
191
192    let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
193
194    for i in 0..=len_a {
195        matrix[i][0] = i;
196    }
197    for j in 0..=len_b {
198        matrix[0][j] = j;
199    }
200
201    let a_chars: Vec<char> = a.chars().collect();
202    let b_chars: Vec<char> = b.chars().collect();
203
204    for i in 1..=len_a {
205        for j in 1..=len_b {
206            let cost = if a_chars[i - 1] == b_chars[j - 1] {
207                0
208            } else {
209                1
210            };
211            matrix[i][j] = min(
212                min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
213                matrix[i - 1][j - 1] + cost,
214            );
215        }
216    }
217
218    matrix[len_a][len_b]
219}
220
221/// Query engine that executes SQL directly on `DataTable`
222#[derive(Clone)]
223pub struct QueryEngine {
224    case_insensitive: bool,
225    date_notation: String,
226    _behavior_config: Option<BehaviorConfig>,
227}
228
229impl Default for QueryEngine {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235impl QueryEngine {
236    #[must_use]
237    pub fn new() -> Self {
238        Self {
239            case_insensitive: false,
240            date_notation: get_date_notation(),
241            _behavior_config: None,
242        }
243    }
244
245    #[must_use]
246    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
247        let case_insensitive = config.case_insensitive_default;
248        // Use get_date_notation() to respect environment variable override
249        let date_notation = get_date_notation();
250        Self {
251            case_insensitive,
252            date_notation,
253            _behavior_config: Some(config),
254        }
255    }
256
257    #[must_use]
258    pub fn with_date_notation(_date_notation: String) -> Self {
259        Self {
260            case_insensitive: false,
261            date_notation: get_date_notation(), // Always use the global function
262            _behavior_config: None,
263        }
264    }
265
266    #[must_use]
267    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
268        Self {
269            case_insensitive,
270            date_notation: get_date_notation(),
271            _behavior_config: None,
272        }
273    }
274
275    #[must_use]
276    pub fn with_case_insensitive_and_date_notation(
277        case_insensitive: bool,
278        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
279    ) -> Self {
280        Self {
281            case_insensitive,
282            date_notation: get_date_notation(), // Always use the global function
283            _behavior_config: None,
284        }
285    }
286
287    /// Find a column name similar to the given name using edit distance
288    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
289        let columns = table.column_names();
290        let mut best_match: Option<(String, usize)> = None;
291
292        for col in columns {
293            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
294            // Only suggest if distance is small (likely a typo)
295            // Allow up to 3 edits for longer names
296            let max_distance = if name.len() > 10 { 3 } else { 2 };
297            if distance <= max_distance {
298                match &best_match {
299                    None => best_match = Some((col, distance)),
300                    Some((_, best_dist)) if distance < *best_dist => {
301                        best_match = Some((col, distance));
302                    }
303                    _ => {}
304                }
305            }
306        }
307
308        best_match.map(|(name, _)| name)
309    }
310
311    /// Calculate Levenshtein edit distance between two strings
312    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
313        let len1 = s1.len();
314        let len2 = s2.len();
315        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
316
317        for i in 0..=len1 {
318            matrix[i][0] = i;
319        }
320        for j in 0..=len2 {
321            matrix[0][j] = j;
322        }
323
324        for (i, c1) in s1.chars().enumerate() {
325            for (j, c2) in s2.chars().enumerate() {
326                let cost = usize::from(c1 != c2);
327                matrix[i + 1][j + 1] = std::cmp::min(
328                    matrix[i][j + 1] + 1, // deletion
329                    std::cmp::min(
330                        matrix[i + 1][j] + 1, // insertion
331                        matrix[i][j] + cost,  // substitution
332                    ),
333                );
334            }
335        }
336
337        matrix[len1][len2]
338    }
339
340    /// Check if an expression contains UNNEST function call
341    fn contains_unnest(expr: &SqlExpression) -> bool {
342        match expr {
343            // Direct UNNEST variant
344            SqlExpression::Unnest { .. } => true,
345            SqlExpression::FunctionCall { name, args, .. } => {
346                if name.to_uppercase() == "UNNEST" {
347                    return true;
348                }
349                // Check recursively in function arguments
350                args.iter().any(Self::contains_unnest)
351            }
352            SqlExpression::BinaryOp { left, right, .. } => {
353                Self::contains_unnest(left) || Self::contains_unnest(right)
354            }
355            SqlExpression::Not { expr } => Self::contains_unnest(expr),
356            SqlExpression::CaseExpression {
357                when_branches,
358                else_branch,
359            } => {
360                when_branches.iter().any(|branch| {
361                    Self::contains_unnest(&branch.condition)
362                        || Self::contains_unnest(&branch.result)
363                }) || else_branch
364                    .as_ref()
365                    .map_or(false, |e| Self::contains_unnest(e))
366            }
367            SqlExpression::SimpleCaseExpression {
368                expr,
369                when_branches,
370                else_branch,
371            } => {
372                Self::contains_unnest(expr)
373                    || when_branches.iter().any(|branch| {
374                        Self::contains_unnest(&branch.value)
375                            || Self::contains_unnest(&branch.result)
376                    })
377                    || else_branch
378                        .as_ref()
379                        .map_or(false, |e| Self::contains_unnest(e))
380            }
381            SqlExpression::InList { expr, values } => {
382                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
383            }
384            SqlExpression::NotInList { expr, values } => {
385                Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
386            }
387            SqlExpression::Between { expr, lower, upper } => {
388                Self::contains_unnest(expr)
389                    || Self::contains_unnest(lower)
390                    || Self::contains_unnest(upper)
391            }
392            SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
393            SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
394            SqlExpression::ScalarSubquery { .. } => false, // Subqueries are handled separately
395            SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
396            SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
397            SqlExpression::ChainedMethodCall { base, args, .. } => {
398                Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
399            }
400            SqlExpression::Unnest { .. } => true, // Direct UNNEST variant
401            _ => false,
402        }
403    }
404
405    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
406    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407        let (view, _plan) = self.execute_with_plan(table, sql)?;
408        Ok(view)
409    }
410
411    /// Execute a SQL query with optional temp table registry access
412    pub fn execute_with_temp_tables(
413        &self,
414        table: Arc<DataTable>,
415        sql: &str,
416        temp_tables: Option<&TempTableRegistry>,
417    ) -> Result<DataView> {
418        let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419        Ok(view)
420    }
421
422    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
423    pub fn execute_statement(
424        &self,
425        table: Arc<DataTable>,
426        statement: SelectStatement,
427    ) -> Result<DataView> {
428        self.execute_statement_with_temp_tables(table, statement, None)
429    }
430
431    /// Execute a parsed SelectStatement with optional temp table access
432    pub fn execute_statement_with_temp_tables(
433        &self,
434        table: Arc<DataTable>,
435        statement: SelectStatement,
436        temp_tables: Option<&TempTableRegistry>,
437    ) -> Result<DataView> {
438        // First process CTEs to build context
439        let mut cte_context = HashMap::new();
440
441        // Add temp tables to CTE context if provided
442        if let Some(temp_registry) = temp_tables {
443            for table_name in temp_registry.list_tables() {
444                if let Some(temp_table) = temp_registry.get(&table_name) {
445                    debug!("Adding temp table {} to CTE context", table_name);
446                    let view = DataView::new(temp_table);
447                    cte_context.insert(table_name, Arc::new(view));
448                }
449            }
450        }
451
452        for cte in &statement.ctes {
453            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454            // Execute the CTE based on its type
455            let cte_result = match &cte.cte_type {
456                CTEType::Standard(query) => {
457                    // Execute the CTE query (it might reference earlier CTEs)
458                    let view = self.build_view_with_context(
459                        table.clone(),
460                        query.clone(),
461                        &mut cte_context,
462                    )?;
463
464                    // Materialize the view and enrich columns with qualified names
465                    let mut materialized = self.materialize_view(view)?;
466
467                    // Enrich columns with qualified names for proper scoping
468                    for column in materialized.columns_mut() {
469                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470                        column.source_table = Some(cte.name.clone());
471                    }
472
473                    DataView::new(Arc::new(materialized))
474                }
475                CTEType::Web(web_spec) => {
476                    // Fetch data from URL
477                    use crate::web::http_fetcher::WebDataFetcher;
478
479                    let fetcher = WebDataFetcher::new()?;
480                    // Pass None for query context (no full SQL available in these contexts)
481                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483                    // Enrich columns with qualified names for proper scoping
484                    for column in data_table.columns_mut() {
485                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486                        column.source_table = Some(cte.name.clone());
487                    }
488
489                    // Convert DataTable to DataView
490                    DataView::new(Arc::new(data_table))
491                }
492            };
493            // Store the result in the context for later use
494            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495            debug!(
496                "QueryEngine: CTE '{}' pre-processed, stored in context",
497                cte.name
498            );
499        }
500
501        // Now process subqueries with CTE context available
502        let mut subquery_executor =
503            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506        // Build the view with the same CTE context
507        self.build_view_with_context(table, processed_statement, &mut cte_context)
508    }
509
510    /// Execute a statement with provided CTE context (for subqueries)
511    pub fn execute_statement_with_cte_context(
512        &self,
513        table: Arc<DataTable>,
514        statement: SelectStatement,
515        cte_context: &HashMap<String, Arc<DataView>>,
516    ) -> Result<DataView> {
517        // Clone the context so we can add any CTEs from this statement
518        let mut local_context = cte_context.clone();
519
520        // Process any CTEs in this statement (they might be nested)
521        for cte in &statement.ctes {
522            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523            let cte_result = match &cte.cte_type {
524                CTEType::Standard(query) => {
525                    let view = self.build_view_with_context(
526                        table.clone(),
527                        query.clone(),
528                        &mut local_context,
529                    )?;
530
531                    // Materialize the view and enrich columns with qualified names
532                    let mut materialized = self.materialize_view(view)?;
533
534                    // Enrich columns with qualified names for proper scoping
535                    for column in materialized.columns_mut() {
536                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537                        column.source_table = Some(cte.name.clone());
538                    }
539
540                    DataView::new(Arc::new(materialized))
541                }
542                CTEType::Web(web_spec) => {
543                    // Fetch data from URL
544                    use crate::web::http_fetcher::WebDataFetcher;
545
546                    let fetcher = WebDataFetcher::new()?;
547                    // Pass None for query context (no full SQL available in these contexts)
548                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550                    // Enrich columns with qualified names for proper scoping
551                    for column in data_table.columns_mut() {
552                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553                        column.source_table = Some(cte.name.clone());
554                    }
555
556                    // Convert DataTable to DataView
557                    DataView::new(Arc::new(data_table))
558                }
559            };
560            local_context.insert(cte.name.clone(), Arc::new(cte_result));
561        }
562
563        // Process subqueries with the complete context
564        let mut subquery_executor =
565            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568        // Build the view
569        self.build_view_with_context(table, processed_statement, &mut local_context)
570    }
571
572    /// Execute a query and return both the result and the execution plan
573    pub fn execute_with_plan(
574        &self,
575        table: Arc<DataTable>,
576        sql: &str,
577    ) -> Result<(DataView, ExecutionPlan)> {
578        self.execute_with_plan_and_temp_tables(table, sql, None)
579    }
580
581    /// Execute a query with temp tables and return both the result and the execution plan
582    pub fn execute_with_plan_and_temp_tables(
583        &self,
584        table: Arc<DataTable>,
585        sql: &str,
586        temp_tables: Option<&TempTableRegistry>,
587    ) -> Result<(DataView, ExecutionPlan)> {
588        let mut plan_builder = ExecutionPlanBuilder::new();
589        let start_time = Instant::now();
590
591        // Parse the SQL query
592        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593        plan_builder.add_detail(format!("Query: {}", sql));
594        let mut parser = Parser::new(sql);
595        let statement = parser
596            .parse()
597            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598        plan_builder.add_detail(format!("Parsed successfully"));
599        if let Some(from) = &statement.from_table {
600            plan_builder.add_detail(format!("FROM: {}", from));
601        }
602        if statement.where_clause.is_some() {
603            plan_builder.add_detail("WHERE clause present".to_string());
604        }
605        plan_builder.end_step();
606
607        // First process CTEs to build context
608        let mut cte_context = HashMap::new();
609
610        // Add temp tables to CTE context if provided
611        if let Some(temp_registry) = temp_tables {
612            for table_name in temp_registry.list_tables() {
613                if let Some(temp_table) = temp_registry.get(&table_name) {
614                    debug!("Adding temp table {} to CTE context", table_name);
615                    let view = DataView::new(temp_table);
616                    cte_context.insert(table_name, Arc::new(view));
617                }
618            }
619        }
620
621        if !statement.ctes.is_empty() {
622            plan_builder.begin_step(
623                StepType::CTE,
624                format!("Process {} CTEs", statement.ctes.len()),
625            );
626
627            for cte in &statement.ctes {
628                let cte_start = Instant::now();
629                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631                let cte_result = match &cte.cte_type {
632                    CTEType::Standard(query) => {
633                        // Add CTE query details
634                        if let Some(from) = &query.from_table {
635                            plan_builder.add_detail(format!("Source: {}", from));
636                        }
637                        if query.where_clause.is_some() {
638                            plan_builder.add_detail("Has WHERE clause".to_string());
639                        }
640                        if query.group_by.is_some() {
641                            plan_builder.add_detail("Has GROUP BY".to_string());
642                        }
643
644                        debug!(
645                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646                            cte.name,
647                            cte_context.keys().collect::<Vec<_>>()
648                        );
649
650                        // Process subqueries in the CTE's query FIRST
651                        // This allows the subqueries to see all previously defined CTEs
652                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
653                            self.clone(),
654                            table.clone(),
655                            cte_context.clone(),
656                        );
657                        let processed_query = subquery_executor.execute_subqueries(query)?;
658
659                        let view = self.build_view_with_context(
660                            table.clone(),
661                            processed_query,
662                            &mut cte_context,
663                        )?;
664
665                        // Materialize the view and enrich columns with qualified names
666                        let mut materialized = self.materialize_view(view)?;
667
668                        // Enrich columns with qualified names for proper scoping
669                        for column in materialized.columns_mut() {
670                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671                            column.source_table = Some(cte.name.clone());
672                        }
673
674                        DataView::new(Arc::new(materialized))
675                    }
676                    CTEType::Web(web_spec) => {
677                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
678                        if let Some(format) = &web_spec.format {
679                            plan_builder.add_detail(format!("Format: {:?}", format));
680                        }
681                        if let Some(cache) = web_spec.cache_seconds {
682                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
683                        }
684
685                        // Fetch data from URL
686                        use crate::web::http_fetcher::WebDataFetcher;
687
688                        let fetcher = WebDataFetcher::new()?;
689                        // Pass None for query context - each WEB CTE is independent
690                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692                        // Enrich columns with qualified names for proper scoping
693                        for column in data_table.columns_mut() {
694                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695                            column.source_table = Some(cte.name.clone());
696                        }
697
698                        // Convert DataTable to DataView
699                        DataView::new(Arc::new(data_table))
700                    }
701                };
702
703                // Record CTE statistics
704                plan_builder.set_rows_out(cte_result.row_count());
705                plan_builder.add_detail(format!(
706                    "Result: {} rows, {} columns",
707                    cte_result.row_count(),
708                    cte_result.column_count()
709                ));
710                plan_builder.add_detail(format!(
711                    "Execution time: {:.3}ms",
712                    cte_start.elapsed().as_secs_f64() * 1000.0
713                ));
714
715                debug!(
716                    "QueryEngine: Storing CTE '{}' in context with {} rows",
717                    cte.name,
718                    cte_result.row_count()
719                );
720                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721                plan_builder.end_step();
722            }
723
724            plan_builder.add_detail(format!(
725                "All {} CTEs cached in context",
726                statement.ctes.len()
727            ));
728            plan_builder.end_step();
729        }
730
731        // Process subqueries in the statement with CTE context
732        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733        let mut subquery_executor =
734            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736        // Check if there are subqueries to process
737        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738            // This is a simplified check - in reality we'd need to walk the AST
739            format!("{:?}", w).contains("Subquery")
740        });
741
742        if has_subqueries {
743            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744        }
745
746        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748        if has_subqueries {
749            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750        } else {
751            plan_builder.add_detail("No subqueries to process".to_string());
752        }
753
754        plan_builder.end_step();
755        let result = self.build_view_with_context_and_plan(
756            table,
757            processed_statement,
758            &mut cte_context,
759            &mut plan_builder,
760        )?;
761
762        let total_duration = start_time.elapsed();
763        info!(
764            "Query execution complete: total={:?}, rows={}",
765            total_duration,
766            result.row_count()
767        );
768
769        let plan = plan_builder.build();
770        Ok((result, plan))
771    }
772
773    /// Build a `DataView` from a parsed SQL statement
774    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775        let mut cte_context = HashMap::new();
776        self.build_view_with_context(table, statement, &mut cte_context)
777    }
778
779    /// Build a DataView from a SelectStatement with CTE context
780    fn build_view_with_context(
781        &self,
782        table: Arc<DataTable>,
783        statement: SelectStatement,
784        cte_context: &mut HashMap<String, Arc<DataView>>,
785    ) -> Result<DataView> {
786        let mut dummy_plan = ExecutionPlanBuilder::new();
787        let mut exec_context = ExecutionContext::new();
788        self.build_view_with_context_and_plan_and_exec(
789            table,
790            statement,
791            cte_context,
792            &mut dummy_plan,
793            &mut exec_context,
794        )
795    }
796
797    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
798    fn build_view_with_context_and_plan(
799        &self,
800        table: Arc<DataTable>,
801        statement: SelectStatement,
802        cte_context: &mut HashMap<String, Arc<DataView>>,
803        plan: &mut ExecutionPlanBuilder,
804    ) -> Result<DataView> {
805        let mut exec_context = ExecutionContext::new();
806        self.build_view_with_context_and_plan_and_exec(
807            table,
808            statement,
809            cte_context,
810            plan,
811            &mut exec_context,
812        )
813    }
814
815    /// Build a DataView with CTE context, execution plan, and alias resolution context
816    fn build_view_with_context_and_plan_and_exec(
817        &self,
818        table: Arc<DataTable>,
819        statement: SelectStatement,
820        cte_context: &mut HashMap<String, Arc<DataView>>,
821        plan: &mut ExecutionPlanBuilder,
822        exec_context: &mut ExecutionContext,
823    ) -> Result<DataView> {
824        // First, process any CTEs that aren't already in the context
825        for cte in &statement.ctes {
826            // Skip if already processed (e.g., by execute_select for WEB CTEs)
827            if cte_context.contains_key(&cte.name) {
828                debug!(
829                    "QueryEngine: CTE '{}' already in context, skipping",
830                    cte.name
831                );
832                continue;
833            }
834
835            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836            debug!(
837                "QueryEngine: Available CTEs for '{}': {:?}",
838                cte.name,
839                cte_context.keys().collect::<Vec<_>>()
840            );
841
842            // Execute the CTE query (it might reference earlier CTEs)
843            let cte_result = match &cte.cte_type {
844                CTEType::Standard(query) => {
845                    let view =
846                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848                    // Materialize the view and enrich columns with qualified names
849                    let mut materialized = self.materialize_view(view)?;
850
851                    // Enrich columns with qualified names for proper scoping
852                    for column in materialized.columns_mut() {
853                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854                        column.source_table = Some(cte.name.clone());
855                    }
856
857                    DataView::new(Arc::new(materialized))
858                }
859                CTEType::Web(_web_spec) => {
860                    // Web CTEs should have been processed earlier in execute_select
861                    return Err(anyhow!(
862                        "Web CTEs should be processed in execute_select method"
863                    ));
864                }
865            };
866
867            // Store the result in the context for later use
868            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869            debug!(
870                "QueryEngine: CTE '{}' processed, stored in context",
871                cte.name
872            );
873        }
874
875        // Determine the source table for the main query
876        let source_table = if let Some(ref table_func) = statement.from_function {
877            // Handle table functions like RANGE()
878            debug!("QueryEngine: Processing table function...");
879            match table_func {
880                TableFunction::Generator { name, args } => {
881                    // Use the generator registry to create the table
882                    use crate::sql::generators::GeneratorRegistry;
883
884                    // Create generator registry (could be cached in QueryEngine)
885                    let registry = GeneratorRegistry::new();
886
887                    if let Some(generator) = registry.get(name) {
888                        // Evaluate arguments
889                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
890                            &table,
891                            self.date_notation.clone(),
892                        );
893                        let dummy_row = 0;
894
895                        let mut evaluated_args = Vec::new();
896                        for arg in args {
897                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898                        }
899
900                        // Generate the table
901                        generator.generate(evaluated_args)?
902                    } else {
903                        return Err(anyhow!("Unknown generator function: {}", name));
904                    }
905                }
906            }
907        } else if let Some(ref subquery) = statement.from_subquery {
908            // Execute the subquery and use its result as the source
909            debug!("QueryEngine: Processing FROM subquery...");
910            let subquery_result =
911                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913            // Convert the DataView to a DataTable for use as source
914            // This materializes the subquery result
915            let materialized = self.materialize_view(subquery_result)?;
916            Arc::new(materialized)
917        } else if let Some(ref table_name) = statement.from_table {
918            // Check if this references a CTE
919            if let Some(cte_view) = cte_context.get(table_name) {
920                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921                // Materialize the CTE view as a table
922                let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924                // Apply alias to qualified column names if present
925                if let Some(ref alias) = statement.from_alias {
926                    debug!(
927                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928                        alias, table_name
929                    );
930                    for column in materialized.columns_mut() {
931                        // Replace the CTE name with the alias in qualified names
932                        if let Some(ref qualified_name) = column.qualified_name {
933                            if qualified_name.starts_with(&format!("{}.", table_name)) {
934                                column.qualified_name =
935                                    Some(qualified_name.replace(
936                                        &format!("{}.", table_name),
937                                        &format!("{}.", alias),
938                                    ));
939                            }
940                        }
941                        // Update source table to reflect the alias
942                        if column.source_table.as_ref() == Some(table_name) {
943                            column.source_table = Some(alias.clone());
944                        }
945                    }
946                }
947
948                Arc::new(materialized)
949            } else {
950                // Regular table reference - use the provided table
951                table.clone()
952            }
953        } else {
954            // No FROM clause - use the provided table
955            table.clone()
956        };
957
958        // Register alias in execution context if present
959        if let Some(ref alias) = statement.from_alias {
960            if let Some(ref table_name) = statement.from_table {
961                exec_context.register_alias(alias.clone(), table_name.clone());
962            }
963        }
964
965        // Process JOINs if present
966        let final_table = if !statement.joins.is_empty() {
967            plan.begin_step(
968                StepType::Join,
969                format!("Process {} JOINs", statement.joins.len()),
970            );
971            plan.set_rows_in(source_table.row_count());
972
973            let join_executor = HashJoinExecutor::new(self.case_insensitive);
974            let mut current_table = source_table;
975
976            for (idx, join_clause) in statement.joins.iter().enumerate() {
977                let join_start = Instant::now();
978                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981                plan.add_detail(format!(
982                    "Executing {:?} JOIN on {} condition(s)",
983                    join_clause.join_type,
984                    join_clause.condition.conditions.len()
985                ));
986
987                // Resolve the right table for the join
988                let right_table = match &join_clause.table {
989                    TableSource::Table(name) => {
990                        // Check if it's a CTE reference
991                        if let Some(cte_view) = cte_context.get(name) {
992                            let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994                            // Apply alias to qualified column names if present
995                            if let Some(ref alias) = join_clause.alias {
996                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997                                for column in materialized.columns_mut() {
998                                    // Replace the CTE name with the alias in qualified names
999                                    if let Some(ref qualified_name) = column.qualified_name {
1000                                        if qualified_name.starts_with(&format!("{}.", name)) {
1001                                            column.qualified_name = Some(qualified_name.replace(
1002                                                &format!("{}.", name),
1003                                                &format!("{}.", alias),
1004                                            ));
1005                                        }
1006                                    }
1007                                    // Update source table to reflect the alias
1008                                    if column.source_table.as_ref() == Some(name) {
1009                                        column.source_table = Some(alias.clone());
1010                                    }
1011                                }
1012                            }
1013
1014                            Arc::new(materialized)
1015                        } else {
1016                            // For now, we need the actual table data
1017                            // In a real implementation, this would load from file
1018                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019                        }
1020                    }
1021                    TableSource::DerivedTable { query, alias: _ } => {
1022                        // Execute the subquery
1023                        let subquery_result = self.build_view_with_context(
1024                            table.clone(),
1025                            *query.clone(),
1026                            cte_context,
1027                        )?;
1028                        let materialized = self.materialize_view(subquery_result)?;
1029                        Arc::new(materialized)
1030                    }
1031                };
1032
1033                // Execute the join
1034                let joined = join_executor.execute_join(
1035                    current_table.clone(),
1036                    join_clause,
1037                    right_table.clone(),
1038                )?;
1039
1040                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041                plan.set_rows_out(joined.row_count());
1042                plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043                plan.add_detail(format!(
1044                    "Join time: {:.3}ms",
1045                    join_start.elapsed().as_secs_f64() * 1000.0
1046                ));
1047                plan.end_step();
1048
1049                current_table = Arc::new(joined);
1050            }
1051
1052            plan.set_rows_out(current_table.row_count());
1053            plan.add_detail(format!(
1054                "Final result after all joins: {} rows",
1055                current_table.row_count()
1056            ));
1057            plan.end_step();
1058            current_table
1059        } else {
1060            source_table
1061        };
1062
1063        // Continue with the existing build_view logic but using final_table
1064        self.build_view_internal_with_plan_and_exec(
1065            final_table,
1066            statement,
1067            plan,
1068            Some(exec_context),
1069        )
1070    }
1071
1072    /// Materialize a DataView into a new DataTable
1073    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074        let source = view.source();
1075        let mut result_table = DataTable::new("derived");
1076
1077        // Get the visible columns from the view
1078        let visible_cols = view.visible_column_indices().to_vec();
1079
1080        // Copy column definitions
1081        for col_idx in &visible_cols {
1082            let col = &source.columns[*col_idx];
1083            let new_col = DataColumn {
1084                name: col.name.clone(),
1085                data_type: col.data_type.clone(),
1086                nullable: col.nullable,
1087                unique_values: col.unique_values,
1088                null_count: col.null_count,
1089                metadata: col.metadata.clone(),
1090                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1091                source_table: col.source_table.clone(),     // Preserve source table
1092            };
1093            result_table.add_column(new_col);
1094        }
1095
1096        // Copy visible rows
1097        for row_idx in view.visible_row_indices() {
1098            let source_row = &source.rows[*row_idx];
1099            let mut new_row = DataRow { values: Vec::new() };
1100
1101            for col_idx in &visible_cols {
1102                new_row.values.push(source_row.values[*col_idx].clone());
1103            }
1104
1105            result_table.add_row(new_row);
1106        }
1107
1108        Ok(result_table)
1109    }
1110
1111    fn build_view_internal(
1112        &self,
1113        table: Arc<DataTable>,
1114        statement: SelectStatement,
1115    ) -> Result<DataView> {
1116        let mut dummy_plan = ExecutionPlanBuilder::new();
1117        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118    }
1119
1120    fn build_view_internal_with_plan(
1121        &self,
1122        table: Arc<DataTable>,
1123        statement: SelectStatement,
1124        plan: &mut ExecutionPlanBuilder,
1125    ) -> Result<DataView> {
1126        self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127    }
1128
1129    fn build_view_internal_with_plan_and_exec(
1130        &self,
1131        table: Arc<DataTable>,
1132        statement: SelectStatement,
1133        plan: &mut ExecutionPlanBuilder,
1134        exec_context: Option<&ExecutionContext>,
1135    ) -> Result<DataView> {
1136        debug!(
1137            "QueryEngine::build_view - select_items: {:?}",
1138            statement.select_items
1139        );
1140        debug!(
1141            "QueryEngine::build_view - where_clause: {:?}",
1142            statement.where_clause
1143        );
1144
1145        // Start with all rows visible
1146        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148        // Apply WHERE clause filtering using recursive evaluator
1149        if let Some(where_clause) = &statement.where_clause {
1150            let total_rows = table.row_count();
1151            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155            plan.set_rows_in(total_rows);
1156            plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158            // Add details about WHERE conditions
1159            for condition in &where_clause.conditions {
1160                plan.add_detail(format!("Condition: {:?}", condition.expr));
1161            }
1162
1163            let filter_start = Instant::now();
1164            // Create an evaluation context for caching compiled regexes
1165            let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167            // Filter visible rows based on WHERE clause
1168            let mut filtered_rows = Vec::new();
1169            for row_idx in visible_rows {
1170                // Only log for first few rows to avoid performance impact
1171                if row_idx < 3 {
1172                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1173                }
1174
1175                // Use exec_context for alias resolution if available, otherwise use eval_context
1176                let mut evaluator = if let Some(exec_ctx) = exec_context {
1177                    RecursiveWhereEvaluator::with_exec_context(
1178                        &table,
1179                        exec_ctx,
1180                        self.case_insensitive,
1181                    )
1182                } else {
1183                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1184                };
1185
1186                match evaluator.evaluate(where_clause, row_idx) {
1187                    Ok(result) => {
1188                        if row_idx < 3 {
1189                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1190                        }
1191                        if result {
1192                            filtered_rows.push(row_idx);
1193                        }
1194                    }
1195                    Err(e) => {
1196                        if row_idx < 3 {
1197                            debug!(
1198                                "QueryEngine: WHERE evaluation error for row {}: {}",
1199                                row_idx, e
1200                            );
1201                        }
1202                        // Propagate WHERE clause errors instead of silently ignoring them
1203                        return Err(e);
1204                    }
1205                }
1206            }
1207
1208            // Log regex cache statistics
1209            let (compilations, cache_hits) = eval_context.get_stats();
1210            if compilations > 0 || cache_hits > 0 {
1211                debug!(
1212                    "LIKE pattern cache: {} compilations, {} cache hits",
1213                    compilations, cache_hits
1214                );
1215            }
1216            visible_rows = filtered_rows;
1217            let filter_duration = filter_start.elapsed();
1218            info!(
1219                "WHERE clause filtering: {} rows -> {} rows in {:?}",
1220                total_rows,
1221                visible_rows.len(),
1222                filter_duration
1223            );
1224
1225            plan.set_rows_out(visible_rows.len());
1226            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1227            plan.add_detail(format!(
1228                "Filter time: {:.3}ms",
1229                filter_duration.as_secs_f64() * 1000.0
1230            ));
1231            plan.end_step();
1232        }
1233
1234        // Create initial DataView with filtered rows
1235        let mut view = DataView::new(table.clone());
1236        view = view.with_rows(visible_rows);
1237
1238        // Handle GROUP BY if present
1239        if let Some(group_by_exprs) = &statement.group_by {
1240            if !group_by_exprs.is_empty() {
1241                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1242
1243                plan.begin_step(
1244                    StepType::GroupBy,
1245                    format!("GROUP BY {} expressions", group_by_exprs.len()),
1246                );
1247                plan.set_rows_in(view.row_count());
1248                plan.add_detail(format!("Input: {} rows", view.row_count()));
1249                for expr in group_by_exprs {
1250                    plan.add_detail(format!("Group by: {:?}", expr));
1251                }
1252
1253                let group_start = Instant::now();
1254                view = self.apply_group_by(
1255                    view,
1256                    group_by_exprs,
1257                    &statement.select_items,
1258                    statement.having.as_ref(),
1259                    plan,
1260                )?;
1261
1262                plan.set_rows_out(view.row_count());
1263                plan.add_detail(format!("Output: {} groups", view.row_count()));
1264                plan.add_detail(format!(
1265                    "Overall time: {:.3}ms",
1266                    group_start.elapsed().as_secs_f64() * 1000.0
1267                ));
1268                plan.end_step();
1269            }
1270        } else {
1271            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
1272            if !statement.select_items.is_empty() {
1273                // Check if we have ANY non-star items (not just the first one)
1274                let has_non_star_items = statement
1275                    .select_items
1276                    .iter()
1277                    .any(|item| !matches!(item, SelectItem::Star));
1278
1279                // Apply select items if:
1280                // 1. We have computed expressions or explicit columns
1281                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
1282                if has_non_star_items || statement.select_items.len() > 1 {
1283                    view = self.apply_select_items(
1284                        view,
1285                        &statement.select_items,
1286                        &statement,
1287                        exec_context,
1288                    )?;
1289                } else {
1290                }
1291                // If it's just a single star, no projection needed
1292            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1293                debug!("QueryEngine: Using legacy columns path");
1294                // Fallback to legacy column projection for backward compatibility
1295                // Use the current view's source table, not the original table
1296                let source_table = view.source();
1297                let column_indices =
1298                    self.resolve_column_indices(source_table, &statement.columns)?;
1299                view = view.with_columns(column_indices);
1300            }
1301        }
1302
1303        // Apply DISTINCT if specified
1304        if statement.distinct {
1305            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1306            plan.set_rows_in(view.row_count());
1307            plan.add_detail(format!("Input: {} rows", view.row_count()));
1308
1309            let distinct_start = Instant::now();
1310            view = self.apply_distinct(view)?;
1311
1312            plan.set_rows_out(view.row_count());
1313            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1314            plan.add_detail(format!(
1315                "Distinct time: {:.3}ms",
1316                distinct_start.elapsed().as_secs_f64() * 1000.0
1317            ));
1318            plan.end_step();
1319        }
1320
1321        // Apply ORDER BY sorting
1322        if let Some(order_by_columns) = &statement.order_by {
1323            if !order_by_columns.is_empty() {
1324                plan.begin_step(
1325                    StepType::Sort,
1326                    format!("ORDER BY {} columns", order_by_columns.len()),
1327                );
1328                plan.set_rows_in(view.row_count());
1329                for col in order_by_columns {
1330                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
1331                }
1332
1333                let sort_start = Instant::now();
1334                view =
1335                    self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1336
1337                plan.add_detail(format!(
1338                    "Sort time: {:.3}ms",
1339                    sort_start.elapsed().as_secs_f64() * 1000.0
1340                ));
1341                plan.end_step();
1342            }
1343        }
1344
1345        // Apply LIMIT/OFFSET
1346        if let Some(limit) = statement.limit {
1347            let offset = statement.offset.unwrap_or(0);
1348            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1349            plan.set_rows_in(view.row_count());
1350            if offset > 0 {
1351                plan.add_detail(format!("OFFSET: {}", offset));
1352            }
1353            view = view.with_limit(limit, offset);
1354            plan.set_rows_out(view.row_count());
1355            plan.add_detail(format!("Output: {} rows", view.row_count()));
1356            plan.end_step();
1357        }
1358
1359        Ok(view)
1360    }
1361
1362    /// Resolve column names to indices
1363    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1364        let mut indices = Vec::new();
1365        let table_columns = table.column_names();
1366
1367        for col_name in columns {
1368            let index = table_columns
1369                .iter()
1370                .position(|c| c.eq_ignore_ascii_case(col_name))
1371                .ok_or_else(|| {
1372                    let suggestion = self.find_similar_column(table, col_name);
1373                    match suggestion {
1374                        Some(similar) => anyhow::anyhow!(
1375                            "Column '{}' not found. Did you mean '{}'?",
1376                            col_name,
1377                            similar
1378                        ),
1379                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1380                    }
1381                })?;
1382            indices.push(index);
1383        }
1384
1385        Ok(indices)
1386    }
1387
1388    /// Apply SELECT items (columns and computed expressions) to create new view
1389    fn apply_select_items(
1390        &self,
1391        view: DataView,
1392        select_items: &[SelectItem],
1393        statement: &SelectStatement,
1394        exec_context: Option<&ExecutionContext>,
1395    ) -> Result<DataView> {
1396        debug!(
1397            "QueryEngine::apply_select_items - items: {:?}",
1398            select_items
1399        );
1400        debug!(
1401            "QueryEngine::apply_select_items - input view has {} rows",
1402            view.row_count()
1403        );
1404
1405        // Check if any SELECT item contains UNNEST - if so, use row expansion mode
1406        let has_unnest = select_items.iter().any(|item| match item {
1407            SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1408            _ => false,
1409        });
1410
1411        if has_unnest {
1412            debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1413            return self.apply_select_with_row_expansion(view, select_items);
1414        }
1415
1416        // Check if this is an aggregate query:
1417        // 1. At least one aggregate function exists
1418        // 2. All other items are either aggregates or constants (aggregate-compatible)
1419        let has_aggregates = select_items.iter().any(|item| match item {
1420            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1421            SelectItem::Column(_) => false,
1422            SelectItem::Star => false,
1423        });
1424
1425        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1426            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1427            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
1428            SelectItem::Star => false,      // Star is not aggregate-compatible
1429        });
1430
1431        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1432            // Special handling for aggregate queries with constants (no GROUP BY)
1433            // These should produce exactly one row
1434            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1435            return self.apply_aggregate_select(view, select_items);
1436        }
1437
1438        // Check if we need to create computed columns
1439        let has_computed_expressions = select_items
1440            .iter()
1441            .any(|item| matches!(item, SelectItem::Expression { .. }));
1442
1443        debug!(
1444            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1445            has_computed_expressions
1446        );
1447
1448        if !has_computed_expressions {
1449            // Simple case: only columns, use existing projection logic
1450            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1451            return Ok(view.with_columns(column_indices));
1452        }
1453
1454        // Complex case: we have computed expressions
1455        // IMPORTANT: We create a PROJECTED view, not a new table
1456        // This preserves the original DataTable reference
1457
1458        let source_table = view.source();
1459        let visible_rows = view.visible_row_indices();
1460
1461        // Create a temporary table just for the computed result view
1462        // But this table is only used for the current query result
1463        let mut computed_table = DataTable::new("query_result");
1464
1465        // First, expand any Star selectors to actual columns
1466        let mut expanded_items = Vec::new();
1467        for item in select_items {
1468            match item {
1469                SelectItem::Star => {
1470                    // Expand * to all columns from source table
1471                    for col_name in source_table.column_names() {
1472                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1473                            col_name.to_string(),
1474                        )));
1475                    }
1476                }
1477                _ => expanded_items.push(item.clone()),
1478            }
1479        }
1480
1481        // Add columns based on expanded SelectItems, handling duplicates
1482        let mut column_name_counts: std::collections::HashMap<String, usize> =
1483            std::collections::HashMap::new();
1484
1485        for item in &expanded_items {
1486            let base_name = match item {
1487                SelectItem::Column(col_ref) => col_ref.name.clone(),
1488                SelectItem::Expression { alias, .. } => alias.clone(),
1489                SelectItem::Star => unreachable!("Star should have been expanded"),
1490            };
1491
1492            // Check if this column name has been used before
1493            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1494            let column_name = if *count == 0 {
1495                // First occurrence, use the name as-is
1496                base_name.clone()
1497            } else {
1498                // Duplicate, append a suffix
1499                format!("{base_name}_{count}")
1500            };
1501            *count += 1;
1502
1503            computed_table.add_column(DataColumn::new(&column_name));
1504        }
1505
1506        // Calculate values for each row
1507        let mut evaluator =
1508            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1509
1510        // Populate table aliases from exec_context if available
1511        if let Some(exec_ctx) = exec_context {
1512            let aliases = exec_ctx.get_aliases();
1513            if !aliases.is_empty() {
1514                debug!(
1515                    "Applying {} aliases to evaluator: {:?}",
1516                    aliases.len(),
1517                    aliases
1518                );
1519                evaluator = evaluator.with_table_aliases(aliases);
1520            }
1521        }
1522
1523        for &row_idx in visible_rows {
1524            let mut row_values = Vec::new();
1525
1526            for item in &expanded_items {
1527                let value = match item {
1528                    SelectItem::Column(col_ref) => {
1529                        // Use evaluator for column resolution (handles aliases properly)
1530                        match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1531                            Ok(val) => val,
1532                            Err(e) => {
1533                                return Err(anyhow!(
1534                                    "Failed to evaluate column {}: {}",
1535                                    col_ref.to_sql(),
1536                                    e
1537                                ));
1538                            }
1539                        }
1540                    }
1541                    SelectItem::Expression { expr, .. } => {
1542                        // Computed expression
1543                        evaluator.evaluate(expr, row_idx)?
1544                    }
1545                    SelectItem::Star => unreachable!("Star should have been expanded"),
1546                };
1547                row_values.push(value);
1548            }
1549
1550            computed_table
1551                .add_row(DataRow::new(row_values))
1552                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1553        }
1554
1555        // Return a view of the computed result
1556        // This is a temporary view for this query only
1557        Ok(DataView::new(Arc::new(computed_table)))
1558    }
1559
1560    /// Apply SELECT with row expansion (for UNNEST, EXPLODE, etc.)
1561    fn apply_select_with_row_expansion(
1562        &self,
1563        view: DataView,
1564        select_items: &[SelectItem],
1565    ) -> Result<DataView> {
1566        debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1567
1568        let source_table = view.source();
1569        let visible_rows = view.visible_row_indices();
1570        let expander_registry = RowExpanderRegistry::new();
1571
1572        // Create result table
1573        let mut result_table = DataTable::new("unnest_result");
1574
1575        // Expand * to columns and set up result columns
1576        let mut expanded_items = Vec::new();
1577        for item in select_items {
1578            match item {
1579                SelectItem::Star => {
1580                    for col_name in source_table.column_names() {
1581                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1582                            col_name.to_string(),
1583                        )));
1584                    }
1585                }
1586                _ => expanded_items.push(item.clone()),
1587            }
1588        }
1589
1590        // Add columns to result table
1591        for item in &expanded_items {
1592            let column_name = match item {
1593                SelectItem::Column(col_ref) => col_ref.name.clone(),
1594                SelectItem::Expression { alias, .. } => alias.clone(),
1595                SelectItem::Star => unreachable!("Star should have been expanded"),
1596            };
1597            result_table.add_column(DataColumn::new(&column_name));
1598        }
1599
1600        // Process each input row
1601        let mut evaluator =
1602            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1603
1604        for &row_idx in visible_rows {
1605            // First pass: identify UNNEST expressions and collect their expansion arrays
1606            let mut unnest_expansions = Vec::new();
1607            let mut unnest_indices = Vec::new();
1608
1609            for (col_idx, item) in expanded_items.iter().enumerate() {
1610                if let SelectItem::Expression { expr, .. } = item {
1611                    if let Some(expansion_result) = self.try_expand_unnest(
1612                        expr,
1613                        source_table,
1614                        row_idx,
1615                        &mut evaluator,
1616                        &expander_registry,
1617                    )? {
1618                        unnest_expansions.push(expansion_result);
1619                        unnest_indices.push(col_idx);
1620                    }
1621                }
1622            }
1623
1624            // Determine how many output rows to generate
1625            let expansion_count = if unnest_expansions.is_empty() {
1626                1 // No UNNEST, just one row
1627            } else {
1628                unnest_expansions
1629                    .iter()
1630                    .map(|exp| exp.row_count())
1631                    .max()
1632                    .unwrap_or(1)
1633            };
1634
1635            // Generate output rows
1636            for output_idx in 0..expansion_count {
1637                let mut row_values = Vec::new();
1638
1639                for (col_idx, item) in expanded_items.iter().enumerate() {
1640                    // Check if this column is an UNNEST column
1641                    let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1642
1643                    let value = if let Some(unnest_idx) = unnest_position {
1644                        // Get value from expansion array (or NULL if exhausted)
1645                        let expansion = &unnest_expansions[unnest_idx];
1646                        expansion
1647                            .values
1648                            .get(output_idx)
1649                            .cloned()
1650                            .unwrap_or(DataValue::Null)
1651                    } else {
1652                        // Regular column or non-UNNEST expression - replicate from input
1653                        match item {
1654                            SelectItem::Column(col_ref) => {
1655                                let col_idx =
1656                                    source_table.get_column_index(&col_ref.name).ok_or_else(
1657                                        || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1658                                    )?;
1659                                let row = source_table
1660                                    .get_row(row_idx)
1661                                    .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1662                                row.get(col_idx)
1663                                    .ok_or_else(|| {
1664                                        anyhow::anyhow!("Column {} not found in row", col_idx)
1665                                    })?
1666                                    .clone()
1667                            }
1668                            SelectItem::Expression { expr, .. } => {
1669                                // Non-UNNEST expression - evaluate once and replicate
1670                                evaluator.evaluate(expr, row_idx)?
1671                            }
1672                            SelectItem::Star => unreachable!(),
1673                        }
1674                    };
1675
1676                    row_values.push(value);
1677                }
1678
1679                result_table
1680                    .add_row(DataRow::new(row_values))
1681                    .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1682            }
1683        }
1684
1685        debug!(
1686            "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1687            visible_rows.len(),
1688            result_table.row_count()
1689        );
1690
1691        Ok(DataView::new(Arc::new(result_table)))
1692    }
1693
1694    /// Try to expand an expression if it's an UNNEST call
1695    /// Returns Some(ExpansionResult) if successful, None if not an UNNEST
1696    fn try_expand_unnest(
1697        &self,
1698        expr: &SqlExpression,
1699        _source_table: &DataTable,
1700        row_idx: usize,
1701        evaluator: &mut ArithmeticEvaluator,
1702        expander_registry: &RowExpanderRegistry,
1703    ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1704        // Check for UNNEST variant (direct syntax)
1705        if let SqlExpression::Unnest { column, delimiter } = expr {
1706            // Evaluate the column expression
1707            let column_value = evaluator.evaluate(column, row_idx)?;
1708
1709            // Delimiter is already a string literal
1710            let delimiter_value = DataValue::String(delimiter.clone());
1711
1712            // Get the UNNEST expander
1713            let expander = expander_registry
1714                .get("UNNEST")
1715                .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1716
1717            // Expand the value
1718            let expansion = expander.expand(&column_value, &[delimiter_value])?;
1719            return Ok(Some(expansion));
1720        }
1721
1722        // Also check for FunctionCall form (for compatibility)
1723        if let SqlExpression::FunctionCall { name, args, .. } = expr {
1724            if name.to_uppercase() == "UNNEST" {
1725                // UNNEST(column, delimiter)
1726                if args.len() != 2 {
1727                    return Err(anyhow::anyhow!(
1728                        "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1729                    ));
1730                }
1731
1732                // Evaluate the column expression (first arg)
1733                let column_value = evaluator.evaluate(&args[0], row_idx)?;
1734
1735                // Evaluate the delimiter expression (second arg)
1736                let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1737
1738                // Get the UNNEST expander
1739                let expander = expander_registry
1740                    .get("UNNEST")
1741                    .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1742
1743                // Expand the value
1744                let expansion = expander.expand(&column_value, &[delimiter_value])?;
1745                return Ok(Some(expansion));
1746            }
1747        }
1748
1749        Ok(None)
1750    }
1751
1752    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1753    fn apply_aggregate_select(
1754        &self,
1755        view: DataView,
1756        select_items: &[SelectItem],
1757    ) -> Result<DataView> {
1758        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1759
1760        let source_table = view.source();
1761        let mut result_table = DataTable::new("aggregate_result");
1762
1763        // Add columns for each select item
1764        for item in select_items {
1765            let column_name = match item {
1766                SelectItem::Expression { alias, .. } => alias.clone(),
1767                _ => unreachable!("Should only have expressions in aggregate-only query"),
1768            };
1769            result_table.add_column(DataColumn::new(&column_name));
1770        }
1771
1772        // Create evaluator with visible rows from the view (for filtered aggregates)
1773        let visible_rows = view.visible_row_indices().to_vec();
1774        let mut evaluator =
1775            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1776                .with_visible_rows(visible_rows);
1777
1778        // Evaluate each aggregate expression once (they handle all rows internally)
1779        let mut row_values = Vec::new();
1780        for item in select_items {
1781            match item {
1782                SelectItem::Expression { expr, .. } => {
1783                    // The evaluator will handle aggregates over all rows
1784                    // We pass row_index=0 but aggregates ignore it and process all rows
1785                    let value = evaluator.evaluate(expr, 0)?;
1786                    row_values.push(value);
1787                }
1788                _ => unreachable!("Should only have expressions in aggregate-only query"),
1789            }
1790        }
1791
1792        // Add the single result row
1793        result_table
1794            .add_row(DataRow::new(row_values))
1795            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1796
1797        Ok(DataView::new(Arc::new(result_table)))
1798    }
1799
1800    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1801    fn resolve_select_columns(
1802        &self,
1803        table: &DataTable,
1804        select_items: &[SelectItem],
1805    ) -> Result<Vec<usize>> {
1806        let mut indices = Vec::new();
1807        let table_columns = table.column_names();
1808
1809        for item in select_items {
1810            match item {
1811                SelectItem::Column(col_ref) => {
1812                    // Check if this has a table prefix
1813                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1814                        // For qualified references, ONLY try qualified lookup - no fallback
1815                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1816                        table.find_column_by_qualified_name(&qualified_name)
1817                            .ok_or_else(|| {
1818                                // Check if any columns have qualified names for better error message
1819                                let has_qualified = table.columns.iter()
1820                                    .any(|c| c.qualified_name.is_some());
1821                                if !has_qualified {
1822                                    anyhow::anyhow!(
1823                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1824                                        qualified_name, table_prefix
1825                                    )
1826                                } else {
1827                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1828                                }
1829                            })?
1830                    } else {
1831                        // Simple column name lookup
1832                        table_columns
1833                            .iter()
1834                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1835                            .ok_or_else(|| {
1836                                let suggestion = self.find_similar_column(table, &col_ref.name);
1837                                match suggestion {
1838                                    Some(similar) => anyhow::anyhow!(
1839                                        "Column '{}' not found. Did you mean '{}'?",
1840                                        col_ref.name,
1841                                        similar
1842                                    ),
1843                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1844                                }
1845                            })?
1846                    };
1847                    indices.push(index);
1848                }
1849                SelectItem::Star => {
1850                    // Expand * to all column indices
1851                    for i in 0..table_columns.len() {
1852                        indices.push(i);
1853                    }
1854                }
1855                SelectItem::Expression { .. } => {
1856                    return Err(anyhow::anyhow!(
1857                        "Computed expressions require new table creation"
1858                    ));
1859                }
1860            }
1861        }
1862
1863        Ok(indices)
1864    }
1865
1866    /// Apply DISTINCT to remove duplicate rows
1867    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1868        use std::collections::HashSet;
1869
1870        let source = view.source();
1871        let visible_cols = view.visible_column_indices();
1872        let visible_rows = view.visible_row_indices();
1873
1874        // Build a set to track unique rows
1875        let mut seen_rows = HashSet::new();
1876        let mut unique_row_indices = Vec::new();
1877
1878        for &row_idx in visible_rows {
1879            // Build a key representing this row's visible column values
1880            let mut row_key = Vec::new();
1881            for &col_idx in visible_cols {
1882                let value = source
1883                    .get_value(row_idx, col_idx)
1884                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1885                // Convert value to a hashable representation
1886                row_key.push(format!("{:?}", value));
1887            }
1888
1889            // Check if we've seen this row before
1890            if seen_rows.insert(row_key) {
1891                // First time seeing this row combination
1892                unique_row_indices.push(row_idx);
1893            }
1894        }
1895
1896        // Create a new view with only unique rows
1897        Ok(view.with_rows(unique_row_indices))
1898    }
1899
1900    /// Apply multi-column ORDER BY sorting to the view
1901    fn apply_multi_order_by(
1902        &self,
1903        mut view: DataView,
1904        order_by_columns: &[OrderByColumn],
1905    ) -> Result<DataView> {
1906        self.apply_multi_order_by_with_context(view, order_by_columns, None)
1907    }
1908
1909    /// Apply multi-column ORDER BY sorting with exec_context for alias resolution
1910    fn apply_multi_order_by_with_context(
1911        &self,
1912        mut view: DataView,
1913        order_by_columns: &[OrderByColumn],
1914        exec_context: Option<&ExecutionContext>,
1915    ) -> Result<DataView> {
1916        // Build list of (source_column_index, ascending) tuples
1917        let mut sort_columns = Vec::new();
1918
1919        for order_col in order_by_columns {
1920            // Try to find the column index, handling qualified column names (table.column)
1921            let col_index = if order_col.column.contains('.') {
1922                // Qualified column name - extract unqualified part
1923                if let Some(dot_pos) = order_col.column.rfind('.') {
1924                    let col_name = &order_col.column[dot_pos + 1..];
1925
1926                    // After SELECT processing, columns are unqualified
1927                    // So just use the column name part
1928                    debug!(
1929                        "ORDER BY: Extracting unqualified column '{}' from '{}'",
1930                        col_name, order_col.column
1931                    );
1932                    view.source().get_column_index(col_name)
1933                } else {
1934                    view.source().get_column_index(&order_col.column)
1935                }
1936            } else {
1937                // Simple column name
1938                view.source().get_column_index(&order_col.column)
1939            }
1940            .ok_or_else(|| {
1941                // If not found, provide helpful error with suggestions
1942                let suggestion = self.find_similar_column(view.source(), &order_col.column);
1943                match suggestion {
1944                    Some(similar) => anyhow::anyhow!(
1945                        "Column '{}' not found. Did you mean '{}'?",
1946                        order_col.column,
1947                        similar
1948                    ),
1949                    None => {
1950                        // Also list available columns for debugging
1951                        let available_cols = view.source().column_names().join(", ");
1952                        anyhow::anyhow!(
1953                            "Column '{}' not found. Available columns: {}",
1954                            order_col.column,
1955                            available_cols
1956                        )
1957                    }
1958                }
1959            })?;
1960
1961            let ascending = matches!(order_col.direction, SortDirection::Asc);
1962            sort_columns.push((col_index, ascending));
1963        }
1964
1965        // Apply multi-column sorting
1966        view.apply_multi_sort(&sort_columns)?;
1967        Ok(view)
1968    }
1969
1970    /// Apply GROUP BY to the view with optional HAVING clause
1971    fn apply_group_by(
1972        &self,
1973        view: DataView,
1974        group_by_exprs: &[SqlExpression],
1975        select_items: &[SelectItem],
1976        having: Option<&SqlExpression>,
1977        plan: &mut ExecutionPlanBuilder,
1978    ) -> Result<DataView> {
1979        // Use the new expression-based GROUP BY implementation
1980        let (result_view, phase_info) = self.apply_group_by_expressions(
1981            view,
1982            group_by_exprs,
1983            select_items,
1984            having,
1985            self.case_insensitive,
1986            self.date_notation.clone(),
1987        )?;
1988
1989        // Add detailed phase information to the execution plan
1990        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1991        plan.add_detail(format!(
1992            "Phase 1 - Group Building: {:.3}ms",
1993            phase_info.phase2_key_building.as_secs_f64() * 1000.0
1994        ));
1995        plan.add_detail(format!(
1996            "  • Processing {} rows into {} groups",
1997            phase_info.total_rows, phase_info.num_groups
1998        ));
1999        plan.add_detail(format!(
2000            "Phase 2 - Aggregation: {:.3}ms",
2001            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2002        ));
2003        if phase_info.phase4_having_evaluation > Duration::ZERO {
2004            plan.add_detail(format!(
2005                "Phase 3 - HAVING Filter: {:.3}ms",
2006                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2007            ));
2008            plan.add_detail(format!(
2009                "  • Filtered {} groups",
2010                phase_info.groups_filtered_by_having
2011            ));
2012        }
2013        plan.add_detail(format!(
2014            "Total GROUP BY time: {:.3}ms",
2015            phase_info.total_time.as_secs_f64() * 1000.0
2016        ));
2017
2018        Ok(result_view)
2019    }
2020
2021    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
2022    /// This helps pre-size hash tables for better performance
2023    pub fn estimate_group_cardinality(
2024        &self,
2025        view: &DataView,
2026        group_by_exprs: &[SqlExpression],
2027    ) -> usize {
2028        // If we have few rows, just return the row count as upper bound
2029        let row_count = view.get_visible_rows().len();
2030        if row_count <= 100 {
2031            return row_count;
2032        }
2033
2034        // Sample first 1000 rows or 10% of data, whichever is smaller
2035        let sample_size = min(1000, row_count / 10).max(100);
2036        let mut seen = FxHashSet::default();
2037
2038        let visible_rows = view.get_visible_rows();
2039        for (i, &row_idx) in visible_rows.iter().enumerate() {
2040            if i >= sample_size {
2041                break;
2042            }
2043
2044            // Evaluate GROUP BY expressions for this row
2045            let mut key_values = Vec::new();
2046            for expr in group_by_exprs {
2047                let mut evaluator = ArithmeticEvaluator::new(view.source());
2048                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2049                key_values.push(value);
2050            }
2051
2052            seen.insert(key_values);
2053        }
2054
2055        // Estimate total cardinality based on sample
2056        let sample_cardinality = seen.len();
2057        let estimated = (sample_cardinality * row_count) / sample_size;
2058
2059        // Cap at row count and ensure minimum of sample cardinality
2060        estimated.min(row_count).max(sample_cardinality)
2061    }
2062}
2063
2064#[cfg(test)]
2065mod tests {
2066    use super::*;
2067    use crate::data::datatable::{DataColumn, DataRow, DataValue};
2068
2069    fn create_test_table() -> Arc<DataTable> {
2070        let mut table = DataTable::new("test");
2071
2072        // Add columns
2073        table.add_column(DataColumn::new("id"));
2074        table.add_column(DataColumn::new("name"));
2075        table.add_column(DataColumn::new("age"));
2076
2077        // Add rows
2078        table
2079            .add_row(DataRow::new(vec![
2080                DataValue::Integer(1),
2081                DataValue::String("Alice".to_string()),
2082                DataValue::Integer(30),
2083            ]))
2084            .unwrap();
2085
2086        table
2087            .add_row(DataRow::new(vec![
2088                DataValue::Integer(2),
2089                DataValue::String("Bob".to_string()),
2090                DataValue::Integer(25),
2091            ]))
2092            .unwrap();
2093
2094        table
2095            .add_row(DataRow::new(vec![
2096                DataValue::Integer(3),
2097                DataValue::String("Charlie".to_string()),
2098                DataValue::Integer(35),
2099            ]))
2100            .unwrap();
2101
2102        Arc::new(table)
2103    }
2104
2105    #[test]
2106    fn test_select_all() {
2107        let table = create_test_table();
2108        let engine = QueryEngine::new();
2109
2110        let view = engine
2111            .execute(table.clone(), "SELECT * FROM users")
2112            .unwrap();
2113        assert_eq!(view.row_count(), 3);
2114        assert_eq!(view.column_count(), 3);
2115    }
2116
2117    #[test]
2118    fn test_select_columns() {
2119        let table = create_test_table();
2120        let engine = QueryEngine::new();
2121
2122        let view = engine
2123            .execute(table.clone(), "SELECT name, age FROM users")
2124            .unwrap();
2125        assert_eq!(view.row_count(), 3);
2126        assert_eq!(view.column_count(), 2);
2127    }
2128
2129    #[test]
2130    fn test_select_with_limit() {
2131        let table = create_test_table();
2132        let engine = QueryEngine::new();
2133
2134        let view = engine
2135            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2136            .unwrap();
2137        assert_eq!(view.row_count(), 2);
2138    }
2139
2140    #[test]
2141    fn test_type_coercion_contains() {
2142        // Initialize tracing for debug output
2143        let _ = tracing_subscriber::fmt()
2144            .with_max_level(tracing::Level::DEBUG)
2145            .try_init();
2146
2147        let mut table = DataTable::new("test");
2148        table.add_column(DataColumn::new("id"));
2149        table.add_column(DataColumn::new("status"));
2150        table.add_column(DataColumn::new("price"));
2151
2152        // Add test data with mixed types
2153        table
2154            .add_row(DataRow::new(vec![
2155                DataValue::Integer(1),
2156                DataValue::String("Pending".to_string()),
2157                DataValue::Float(99.99),
2158            ]))
2159            .unwrap();
2160
2161        table
2162            .add_row(DataRow::new(vec![
2163                DataValue::Integer(2),
2164                DataValue::String("Confirmed".to_string()),
2165                DataValue::Float(150.50),
2166            ]))
2167            .unwrap();
2168
2169        table
2170            .add_row(DataRow::new(vec![
2171                DataValue::Integer(3),
2172                DataValue::String("Pending".to_string()),
2173                DataValue::Float(75.00),
2174            ]))
2175            .unwrap();
2176
2177        let table = Arc::new(table);
2178        let engine = QueryEngine::new();
2179
2180        println!("\n=== Testing WHERE clause with Contains ===");
2181        println!("Table has {} rows", table.row_count());
2182        for i in 0..table.row_count() {
2183            let status = table.get_value(i, 1);
2184            println!("Row {i}: status = {status:?}");
2185        }
2186
2187        // Test 1: Basic string contains (should work)
2188        println!("\n--- Test 1: status.Contains('pend') ---");
2189        let result = engine.execute(
2190            table.clone(),
2191            "SELECT * FROM test WHERE status.Contains('pend')",
2192        );
2193        match result {
2194            Ok(view) => {
2195                println!("SUCCESS: Found {} matching rows", view.row_count());
2196                assert_eq!(view.row_count(), 2); // Should find both Pending rows
2197            }
2198            Err(e) => {
2199                panic!("Query failed: {e}");
2200            }
2201        }
2202
2203        // Test 2: Numeric contains (should work with type coercion)
2204        println!("\n--- Test 2: price.Contains('9') ---");
2205        let result = engine.execute(
2206            table.clone(),
2207            "SELECT * FROM test WHERE price.Contains('9')",
2208        );
2209        match result {
2210            Ok(view) => {
2211                println!(
2212                    "SUCCESS: Found {} matching rows with price containing '9'",
2213                    view.row_count()
2214                );
2215                // Should find 99.99 row
2216                assert!(view.row_count() >= 1);
2217            }
2218            Err(e) => {
2219                panic!("Numeric coercion query failed: {e}");
2220            }
2221        }
2222
2223        println!("\n=== All tests passed! ===");
2224    }
2225
2226    #[test]
2227    fn test_not_in_clause() {
2228        // Initialize tracing for debug output
2229        let _ = tracing_subscriber::fmt()
2230            .with_max_level(tracing::Level::DEBUG)
2231            .try_init();
2232
2233        let mut table = DataTable::new("test");
2234        table.add_column(DataColumn::new("id"));
2235        table.add_column(DataColumn::new("country"));
2236
2237        // Add test data
2238        table
2239            .add_row(DataRow::new(vec![
2240                DataValue::Integer(1),
2241                DataValue::String("CA".to_string()),
2242            ]))
2243            .unwrap();
2244
2245        table
2246            .add_row(DataRow::new(vec![
2247                DataValue::Integer(2),
2248                DataValue::String("US".to_string()),
2249            ]))
2250            .unwrap();
2251
2252        table
2253            .add_row(DataRow::new(vec![
2254                DataValue::Integer(3),
2255                DataValue::String("UK".to_string()),
2256            ]))
2257            .unwrap();
2258
2259        let table = Arc::new(table);
2260        let engine = QueryEngine::new();
2261
2262        println!("\n=== Testing NOT IN clause ===");
2263        println!("Table has {} rows", table.row_count());
2264        for i in 0..table.row_count() {
2265            let country = table.get_value(i, 1);
2266            println!("Row {i}: country = {country:?}");
2267        }
2268
2269        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
2270        println!("\n--- Test: country NOT IN ('CA') ---");
2271        let result = engine.execute(
2272            table.clone(),
2273            "SELECT * FROM test WHERE country NOT IN ('CA')",
2274        );
2275        match result {
2276            Ok(view) => {
2277                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2278                assert_eq!(view.row_count(), 2); // Should find US and UK
2279            }
2280            Err(e) => {
2281                panic!("NOT IN query failed: {e}");
2282            }
2283        }
2284
2285        println!("\n=== NOT IN test complete! ===");
2286    }
2287
2288    #[test]
2289    fn test_case_insensitive_in_and_not_in() {
2290        // Initialize tracing for debug output
2291        let _ = tracing_subscriber::fmt()
2292            .with_max_level(tracing::Level::DEBUG)
2293            .try_init();
2294
2295        let mut table = DataTable::new("test");
2296        table.add_column(DataColumn::new("id"));
2297        table.add_column(DataColumn::new("country"));
2298
2299        // Add test data with mixed case
2300        table
2301            .add_row(DataRow::new(vec![
2302                DataValue::Integer(1),
2303                DataValue::String("CA".to_string()), // uppercase
2304            ]))
2305            .unwrap();
2306
2307        table
2308            .add_row(DataRow::new(vec![
2309                DataValue::Integer(2),
2310                DataValue::String("us".to_string()), // lowercase
2311            ]))
2312            .unwrap();
2313
2314        table
2315            .add_row(DataRow::new(vec![
2316                DataValue::Integer(3),
2317                DataValue::String("UK".to_string()), // uppercase
2318            ]))
2319            .unwrap();
2320
2321        let table = Arc::new(table);
2322
2323        println!("\n=== Testing Case-Insensitive IN clause ===");
2324        println!("Table has {} rows", table.row_count());
2325        for i in 0..table.row_count() {
2326            let country = table.get_value(i, 1);
2327            println!("Row {i}: country = {country:?}");
2328        }
2329
2330        // Test case-insensitive IN - should match 'CA' with 'ca'
2331        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2332        let engine = QueryEngine::with_case_insensitive(true);
2333        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2334        match result {
2335            Ok(view) => {
2336                println!(
2337                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2338                    view.row_count()
2339                );
2340                assert_eq!(view.row_count(), 1); // Should find CA row
2341            }
2342            Err(e) => {
2343                panic!("Case-insensitive IN query failed: {e}");
2344            }
2345        }
2346
2347        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
2348        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2349        let result = engine.execute(
2350            table.clone(),
2351            "SELECT * FROM test WHERE country NOT IN ('ca')",
2352        );
2353        match result {
2354            Ok(view) => {
2355                println!(
2356                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2357                    view.row_count()
2358                );
2359                assert_eq!(view.row_count(), 2); // Should find us and UK rows
2360            }
2361            Err(e) => {
2362                panic!("Case-insensitive NOT IN query failed: {e}");
2363            }
2364        }
2365
2366        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
2367        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2368        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
2369        let result = engine_case_sensitive
2370            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2371        match result {
2372            Ok(view) => {
2373                println!(
2374                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2375                    view.row_count()
2376                );
2377                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
2378            }
2379            Err(e) => {
2380                panic!("Case-sensitive IN query failed: {e}");
2381            }
2382        }
2383
2384        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2385    }
2386
2387    #[test]
2388    #[ignore = "Parentheses in WHERE clause not yet implemented"]
2389    fn test_parentheses_in_where_clause() {
2390        // Initialize tracing for debug output
2391        let _ = tracing_subscriber::fmt()
2392            .with_max_level(tracing::Level::DEBUG)
2393            .try_init();
2394
2395        let mut table = DataTable::new("test");
2396        table.add_column(DataColumn::new("id"));
2397        table.add_column(DataColumn::new("status"));
2398        table.add_column(DataColumn::new("priority"));
2399
2400        // Add test data
2401        table
2402            .add_row(DataRow::new(vec![
2403                DataValue::Integer(1),
2404                DataValue::String("Pending".to_string()),
2405                DataValue::String("High".to_string()),
2406            ]))
2407            .unwrap();
2408
2409        table
2410            .add_row(DataRow::new(vec![
2411                DataValue::Integer(2),
2412                DataValue::String("Complete".to_string()),
2413                DataValue::String("High".to_string()),
2414            ]))
2415            .unwrap();
2416
2417        table
2418            .add_row(DataRow::new(vec![
2419                DataValue::Integer(3),
2420                DataValue::String("Pending".to_string()),
2421                DataValue::String("Low".to_string()),
2422            ]))
2423            .unwrap();
2424
2425        table
2426            .add_row(DataRow::new(vec![
2427                DataValue::Integer(4),
2428                DataValue::String("Complete".to_string()),
2429                DataValue::String("Low".to_string()),
2430            ]))
2431            .unwrap();
2432
2433        let table = Arc::new(table);
2434        let engine = QueryEngine::new();
2435
2436        println!("\n=== Testing Parentheses in WHERE clause ===");
2437        println!("Table has {} rows", table.row_count());
2438        for i in 0..table.row_count() {
2439            let status = table.get_value(i, 1);
2440            let priority = table.get_value(i, 2);
2441            println!("Row {i}: status = {status:?}, priority = {priority:?}");
2442        }
2443
2444        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
2445        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2446        let result = engine.execute(
2447            table.clone(),
2448            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2449        );
2450        match result {
2451            Ok(view) => {
2452                println!(
2453                    "SUCCESS: Found {} rows with parenthetical logic",
2454                    view.row_count()
2455                );
2456                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
2457            }
2458            Err(e) => {
2459                panic!("Parentheses query failed: {e}");
2460            }
2461        }
2462
2463        println!("\n=== Parentheses test complete! ===");
2464    }
2465
2466    #[test]
2467    #[ignore = "Numeric type coercion needs fixing"]
2468    fn test_numeric_type_coercion() {
2469        // Initialize tracing for debug output
2470        let _ = tracing_subscriber::fmt()
2471            .with_max_level(tracing::Level::DEBUG)
2472            .try_init();
2473
2474        let mut table = DataTable::new("test");
2475        table.add_column(DataColumn::new("id"));
2476        table.add_column(DataColumn::new("price"));
2477        table.add_column(DataColumn::new("quantity"));
2478
2479        // Add test data with different numeric types
2480        table
2481            .add_row(DataRow::new(vec![
2482                DataValue::Integer(1),
2483                DataValue::Float(99.50), // Contains '.'
2484                DataValue::Integer(100),
2485            ]))
2486            .unwrap();
2487
2488        table
2489            .add_row(DataRow::new(vec![
2490                DataValue::Integer(2),
2491                DataValue::Float(150.0), // Contains '.' and '0'
2492                DataValue::Integer(200),
2493            ]))
2494            .unwrap();
2495
2496        table
2497            .add_row(DataRow::new(vec![
2498                DataValue::Integer(3),
2499                DataValue::Integer(75), // No decimal point
2500                DataValue::Integer(50),
2501            ]))
2502            .unwrap();
2503
2504        let table = Arc::new(table);
2505        let engine = QueryEngine::new();
2506
2507        println!("\n=== Testing Numeric Type Coercion ===");
2508        println!("Table has {} rows", table.row_count());
2509        for i in 0..table.row_count() {
2510            let price = table.get_value(i, 1);
2511            let quantity = table.get_value(i, 2);
2512            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2513        }
2514
2515        // Test Contains on float values - should find rows with decimal points
2516        println!("\n--- Test: price.Contains('.') ---");
2517        let result = engine.execute(
2518            table.clone(),
2519            "SELECT * FROM test WHERE price.Contains('.')",
2520        );
2521        match result {
2522            Ok(view) => {
2523                println!(
2524                    "SUCCESS: Found {} rows with decimal points in price",
2525                    view.row_count()
2526                );
2527                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
2528            }
2529            Err(e) => {
2530                panic!("Numeric Contains query failed: {e}");
2531            }
2532        }
2533
2534        // Test Contains on integer values converted to string
2535        println!("\n--- Test: quantity.Contains('0') ---");
2536        let result = engine.execute(
2537            table.clone(),
2538            "SELECT * FROM test WHERE quantity.Contains('0')",
2539        );
2540        match result {
2541            Ok(view) => {
2542                println!(
2543                    "SUCCESS: Found {} rows with '0' in quantity",
2544                    view.row_count()
2545                );
2546                assert_eq!(view.row_count(), 2); // Should find 100 and 200
2547            }
2548            Err(e) => {
2549                panic!("Integer Contains query failed: {e}");
2550            }
2551        }
2552
2553        println!("\n=== Numeric type coercion test complete! ===");
2554    }
2555
2556    #[test]
2557    fn test_datetime_comparisons() {
2558        // Initialize tracing for debug output
2559        let _ = tracing_subscriber::fmt()
2560            .with_max_level(tracing::Level::DEBUG)
2561            .try_init();
2562
2563        let mut table = DataTable::new("test");
2564        table.add_column(DataColumn::new("id"));
2565        table.add_column(DataColumn::new("created_date"));
2566
2567        // Add test data with date strings (as they would come from CSV)
2568        table
2569            .add_row(DataRow::new(vec![
2570                DataValue::Integer(1),
2571                DataValue::String("2024-12-15".to_string()),
2572            ]))
2573            .unwrap();
2574
2575        table
2576            .add_row(DataRow::new(vec![
2577                DataValue::Integer(2),
2578                DataValue::String("2025-01-15".to_string()),
2579            ]))
2580            .unwrap();
2581
2582        table
2583            .add_row(DataRow::new(vec![
2584                DataValue::Integer(3),
2585                DataValue::String("2025-02-15".to_string()),
2586            ]))
2587            .unwrap();
2588
2589        let table = Arc::new(table);
2590        let engine = QueryEngine::new();
2591
2592        println!("\n=== Testing DateTime Comparisons ===");
2593        println!("Table has {} rows", table.row_count());
2594        for i in 0..table.row_count() {
2595            let date = table.get_value(i, 1);
2596            println!("Row {i}: created_date = {date:?}");
2597        }
2598
2599        // Test DateTime constructor comparison - should find dates after 2025-01-01
2600        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2601        let result = engine.execute(
2602            table.clone(),
2603            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2604        );
2605        match result {
2606            Ok(view) => {
2607                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2608                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2609            }
2610            Err(e) => {
2611                panic!("DateTime comparison query failed: {e}");
2612            }
2613        }
2614
2615        println!("\n=== DateTime comparison test complete! ===");
2616    }
2617
2618    #[test]
2619    fn test_not_with_method_calls() {
2620        // Initialize tracing for debug output
2621        let _ = tracing_subscriber::fmt()
2622            .with_max_level(tracing::Level::DEBUG)
2623            .try_init();
2624
2625        let mut table = DataTable::new("test");
2626        table.add_column(DataColumn::new("id"));
2627        table.add_column(DataColumn::new("status"));
2628
2629        // Add test data
2630        table
2631            .add_row(DataRow::new(vec![
2632                DataValue::Integer(1),
2633                DataValue::String("Pending Review".to_string()),
2634            ]))
2635            .unwrap();
2636
2637        table
2638            .add_row(DataRow::new(vec![
2639                DataValue::Integer(2),
2640                DataValue::String("Complete".to_string()),
2641            ]))
2642            .unwrap();
2643
2644        table
2645            .add_row(DataRow::new(vec![
2646                DataValue::Integer(3),
2647                DataValue::String("Pending Approval".to_string()),
2648            ]))
2649            .unwrap();
2650
2651        let table = Arc::new(table);
2652        let engine = QueryEngine::with_case_insensitive(true);
2653
2654        println!("\n=== Testing NOT with Method Calls ===");
2655        println!("Table has {} rows", table.row_count());
2656        for i in 0..table.row_count() {
2657            let status = table.get_value(i, 1);
2658            println!("Row {i}: status = {status:?}");
2659        }
2660
2661        // Test NOT with Contains - should exclude rows containing "pend"
2662        println!("\n--- Test: NOT status.Contains('pend') ---");
2663        let result = engine.execute(
2664            table.clone(),
2665            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2666        );
2667        match result {
2668            Ok(view) => {
2669                println!(
2670                    "SUCCESS: Found {} rows NOT containing 'pend'",
2671                    view.row_count()
2672                );
2673                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2674            }
2675            Err(e) => {
2676                panic!("NOT Contains query failed: {e}");
2677            }
2678        }
2679
2680        // Test NOT with StartsWith
2681        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2682        let result = engine.execute(
2683            table.clone(),
2684            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2685        );
2686        match result {
2687            Ok(view) => {
2688                println!(
2689                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2690                    view.row_count()
2691                );
2692                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2693            }
2694            Err(e) => {
2695                panic!("NOT StartsWith query failed: {e}");
2696            }
2697        }
2698
2699        println!("\n=== NOT with method calls test complete! ===");
2700    }
2701
2702    #[test]
2703    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2704    fn test_complex_logical_expressions() {
2705        // Initialize tracing for debug output
2706        let _ = tracing_subscriber::fmt()
2707            .with_max_level(tracing::Level::DEBUG)
2708            .try_init();
2709
2710        let mut table = DataTable::new("test");
2711        table.add_column(DataColumn::new("id"));
2712        table.add_column(DataColumn::new("status"));
2713        table.add_column(DataColumn::new("priority"));
2714        table.add_column(DataColumn::new("assigned"));
2715
2716        // Add comprehensive test data
2717        table
2718            .add_row(DataRow::new(vec![
2719                DataValue::Integer(1),
2720                DataValue::String("Pending".to_string()),
2721                DataValue::String("High".to_string()),
2722                DataValue::String("John".to_string()),
2723            ]))
2724            .unwrap();
2725
2726        table
2727            .add_row(DataRow::new(vec![
2728                DataValue::Integer(2),
2729                DataValue::String("Complete".to_string()),
2730                DataValue::String("High".to_string()),
2731                DataValue::String("Jane".to_string()),
2732            ]))
2733            .unwrap();
2734
2735        table
2736            .add_row(DataRow::new(vec![
2737                DataValue::Integer(3),
2738                DataValue::String("Pending".to_string()),
2739                DataValue::String("Low".to_string()),
2740                DataValue::String("John".to_string()),
2741            ]))
2742            .unwrap();
2743
2744        table
2745            .add_row(DataRow::new(vec![
2746                DataValue::Integer(4),
2747                DataValue::String("In Progress".to_string()),
2748                DataValue::String("Medium".to_string()),
2749                DataValue::String("Jane".to_string()),
2750            ]))
2751            .unwrap();
2752
2753        let table = Arc::new(table);
2754        let engine = QueryEngine::new();
2755
2756        println!("\n=== Testing Complex Logical Expressions ===");
2757        println!("Table has {} rows", table.row_count());
2758        for i in 0..table.row_count() {
2759            let status = table.get_value(i, 1);
2760            let priority = table.get_value(i, 2);
2761            let assigned = table.get_value(i, 3);
2762            println!(
2763                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2764            );
2765        }
2766
2767        // Test complex AND/OR logic
2768        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2769        let result = engine.execute(
2770            table.clone(),
2771            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2772        );
2773        match result {
2774            Ok(view) => {
2775                println!(
2776                    "SUCCESS: Found {} rows with complex logic",
2777                    view.row_count()
2778                );
2779                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2780            }
2781            Err(e) => {
2782                panic!("Complex logic query failed: {e}");
2783            }
2784        }
2785
2786        // Test NOT with complex expressions
2787        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2788        let result = engine.execute(
2789            table.clone(),
2790            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2791        );
2792        match result {
2793            Ok(view) => {
2794                println!(
2795                    "SUCCESS: Found {} rows with NOT complex logic",
2796                    view.row_count()
2797                );
2798                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2799            }
2800            Err(e) => {
2801                panic!("NOT complex logic query failed: {e}");
2802            }
2803        }
2804
2805        println!("\n=== Complex logical expressions test complete! ===");
2806    }
2807
2808    #[test]
2809    fn test_mixed_data_types_and_edge_cases() {
2810        // Initialize tracing for debug output
2811        let _ = tracing_subscriber::fmt()
2812            .with_max_level(tracing::Level::DEBUG)
2813            .try_init();
2814
2815        let mut table = DataTable::new("test");
2816        table.add_column(DataColumn::new("id"));
2817        table.add_column(DataColumn::new("value"));
2818        table.add_column(DataColumn::new("nullable_field"));
2819
2820        // Add test data with mixed types and edge cases
2821        table
2822            .add_row(DataRow::new(vec![
2823                DataValue::Integer(1),
2824                DataValue::String("123.45".to_string()),
2825                DataValue::String("present".to_string()),
2826            ]))
2827            .unwrap();
2828
2829        table
2830            .add_row(DataRow::new(vec![
2831                DataValue::Integer(2),
2832                DataValue::Float(678.90),
2833                DataValue::Null,
2834            ]))
2835            .unwrap();
2836
2837        table
2838            .add_row(DataRow::new(vec![
2839                DataValue::Integer(3),
2840                DataValue::Boolean(true),
2841                DataValue::String("also present".to_string()),
2842            ]))
2843            .unwrap();
2844
2845        table
2846            .add_row(DataRow::new(vec![
2847                DataValue::Integer(4),
2848                DataValue::String("false".to_string()),
2849                DataValue::Null,
2850            ]))
2851            .unwrap();
2852
2853        let table = Arc::new(table);
2854        let engine = QueryEngine::new();
2855
2856        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2857        println!("Table has {} rows", table.row_count());
2858        for i in 0..table.row_count() {
2859            let value = table.get_value(i, 1);
2860            let nullable = table.get_value(i, 2);
2861            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2862        }
2863
2864        // Test type coercion with boolean Contains
2865        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2866        let result = engine.execute(
2867            table.clone(),
2868            "SELECT * FROM test WHERE value.Contains('true')",
2869        );
2870        match result {
2871            Ok(view) => {
2872                println!(
2873                    "SUCCESS: Found {} rows with boolean coercion",
2874                    view.row_count()
2875                );
2876                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2877            }
2878            Err(e) => {
2879                panic!("Boolean coercion query failed: {e}");
2880            }
2881        }
2882
2883        // Test multiple IN values with mixed types
2884        println!("\n--- Test: id IN (1, 3) ---");
2885        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2886        match result {
2887            Ok(view) => {
2888                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2889                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2890            }
2891            Err(e) => {
2892                panic!("Multiple IN values query failed: {e}");
2893            }
2894        }
2895
2896        println!("\n=== Mixed data types test complete! ===");
2897    }
2898
2899    /// Test that aggregate-only queries return exactly one row (regression test)
2900    #[test]
2901    fn test_aggregate_only_single_row() {
2902        let table = create_test_stock_data();
2903        let engine = QueryEngine::new();
2904
2905        // Test query with multiple aggregates - should return exactly 1 row
2906        let result = engine
2907            .execute(
2908                table.clone(),
2909                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2910            )
2911            .expect("Query should succeed");
2912
2913        assert_eq!(
2914            result.row_count(),
2915            1,
2916            "Aggregate-only query should return exactly 1 row"
2917        );
2918        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2919
2920        // Verify the actual values are correct
2921        let source = result.source();
2922        let row = source.get_row(0).expect("Should have first row");
2923
2924        // COUNT(*) should be 5 (total rows)
2925        assert_eq!(row.values[0], DataValue::Integer(5));
2926
2927        // MIN should be 99.5
2928        assert_eq!(row.values[1], DataValue::Float(99.5));
2929
2930        // MAX should be 105.0
2931        assert_eq!(row.values[2], DataValue::Float(105.0));
2932
2933        // AVG should be approximately 102.4
2934        if let DataValue::Float(avg) = &row.values[3] {
2935            assert!(
2936                (avg - 102.4).abs() < 0.01,
2937                "Average should be approximately 102.4, got {}",
2938                avg
2939            );
2940        } else {
2941            panic!("AVG should return a Float value");
2942        }
2943    }
2944
2945    /// Test single aggregate function returns single row
2946    #[test]
2947    fn test_single_aggregate_single_row() {
2948        let table = create_test_stock_data();
2949        let engine = QueryEngine::new();
2950
2951        let result = engine
2952            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2953            .expect("Query should succeed");
2954
2955        assert_eq!(
2956            result.row_count(),
2957            1,
2958            "Single aggregate query should return exactly 1 row"
2959        );
2960        assert_eq!(result.column_count(), 1, "Should have 1 column");
2961
2962        let source = result.source();
2963        let row = source.get_row(0).expect("Should have first row");
2964        assert_eq!(row.values[0], DataValue::Integer(5));
2965    }
2966
2967    /// Test aggregate with WHERE clause filtering
2968    #[test]
2969    fn test_aggregate_with_where_single_row() {
2970        let table = create_test_stock_data();
2971        let engine = QueryEngine::new();
2972
2973        // Filter to only high-value stocks (>= 103.0) and aggregate
2974        let result = engine
2975            .execute(
2976                table.clone(),
2977                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2978            )
2979            .expect("Query should succeed");
2980
2981        assert_eq!(
2982            result.row_count(),
2983            1,
2984            "Filtered aggregate query should return exactly 1 row"
2985        );
2986        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2987
2988        let source = result.source();
2989        let row = source.get_row(0).expect("Should have first row");
2990
2991        // Should find 2 rows (103.5 and 105.0)
2992        assert_eq!(row.values[0], DataValue::Integer(2));
2993        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2994        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2995    }
2996
2997    #[test]
2998    fn test_not_in_parsing() {
2999        use crate::sql::recursive_parser::Parser;
3000
3001        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3002        println!("\n=== Testing NOT IN parsing ===");
3003        println!("Parsing query: {query}");
3004
3005        let mut parser = Parser::new(query);
3006        match parser.parse() {
3007            Ok(statement) => {
3008                println!("Parsed statement: {statement:#?}");
3009                if let Some(where_clause) = statement.where_clause {
3010                    println!("WHERE conditions: {:#?}", where_clause.conditions);
3011                    if let Some(first_condition) = where_clause.conditions.first() {
3012                        println!("First condition expression: {:#?}", first_condition.expr);
3013                    }
3014                }
3015            }
3016            Err(e) => {
3017                panic!("Parse error: {e}");
3018            }
3019        }
3020    }
3021
3022    /// Create test stock data for aggregate testing
3023    fn create_test_stock_data() -> Arc<DataTable> {
3024        let mut table = DataTable::new("stock");
3025
3026        table.add_column(DataColumn::new("symbol"));
3027        table.add_column(DataColumn::new("close"));
3028        table.add_column(DataColumn::new("volume"));
3029
3030        // Add 5 rows of test data
3031        let test_data = vec![
3032            ("AAPL", 99.5, 1000),
3033            ("AAPL", 101.2, 1500),
3034            ("AAPL", 103.5, 2000),
3035            ("AAPL", 105.0, 1200),
3036            ("AAPL", 102.8, 1800),
3037        ];
3038
3039        for (symbol, close, volume) in test_data {
3040            table
3041                .add_row(DataRow::new(vec![
3042                    DataValue::String(symbol.to_string()),
3043                    DataValue::Float(close),
3044                    DataValue::Integer(volume),
3045                ]))
3046                .expect("Should add row successfully");
3047        }
3048
3049        Arc::new(table)
3050    }
3051}
3052
3053#[cfg(test)]
3054#[path = "query_engine_tests.rs"]
3055mod query_engine_tests;