sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::{GroupByExpressions, GroupByPhaseInfo};
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26    TableFunction,
27};
28
29/// Query engine that executes SQL directly on `DataTable`
30#[derive(Clone)]
31pub struct QueryEngine {
32    case_insensitive: bool,
33    date_notation: String,
34    behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl QueryEngine {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            case_insensitive: false,
48            date_notation: get_date_notation(),
49            behavior_config: None,
50        }
51    }
52
53    #[must_use]
54    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55        let case_insensitive = config.case_insensitive_default;
56        // Use get_date_notation() to respect environment variable override
57        let date_notation = get_date_notation();
58        Self {
59            case_insensitive,
60            date_notation,
61            behavior_config: Some(config),
62        }
63    }
64
65    #[must_use]
66    pub fn with_date_notation(_date_notation: String) -> Self {
67        Self {
68            case_insensitive: false,
69            date_notation: get_date_notation(), // Always use the global function
70            behavior_config: None,
71        }
72    }
73
74    #[must_use]
75    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76        Self {
77            case_insensitive,
78            date_notation: get_date_notation(),
79            behavior_config: None,
80        }
81    }
82
83    #[must_use]
84    pub fn with_case_insensitive_and_date_notation(
85        case_insensitive: bool,
86        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
87    ) -> Self {
88        Self {
89            case_insensitive,
90            date_notation: get_date_notation(), // Always use the global function
91            behavior_config: None,
92        }
93    }
94
95    /// Find a column name similar to the given name using edit distance
96    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97        let columns = table.column_names();
98        let mut best_match: Option<(String, usize)> = None;
99
100        for col in columns {
101            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102            // Only suggest if distance is small (likely a typo)
103            // Allow up to 3 edits for longer names
104            let max_distance = if name.len() > 10 { 3 } else { 2 };
105            if distance <= max_distance {
106                match &best_match {
107                    None => best_match = Some((col, distance)),
108                    Some((_, best_dist)) if distance < *best_dist => {
109                        best_match = Some((col, distance));
110                    }
111                    _ => {}
112                }
113            }
114        }
115
116        best_match.map(|(name, _)| name)
117    }
118
119    /// Calculate Levenshtein edit distance between two strings
120    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121        let len1 = s1.len();
122        let len2 = s2.len();
123        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125        for i in 0..=len1 {
126            matrix[i][0] = i;
127        }
128        for j in 0..=len2 {
129            matrix[0][j] = j;
130        }
131
132        for (i, c1) in s1.chars().enumerate() {
133            for (j, c2) in s2.chars().enumerate() {
134                let cost = usize::from(c1 != c2);
135                matrix[i + 1][j + 1] = std::cmp::min(
136                    matrix[i][j + 1] + 1, // deletion
137                    std::cmp::min(
138                        matrix[i + 1][j] + 1, // insertion
139                        matrix[i][j] + cost,  // substitution
140                    ),
141                );
142            }
143        }
144
145        matrix[len1][len2]
146    }
147
148    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
149    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150        let (view, _plan) = self.execute_with_plan(table, sql)?;
151        Ok(view)
152    }
153
154    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
155    pub fn execute_statement(
156        &self,
157        table: Arc<DataTable>,
158        statement: SelectStatement,
159    ) -> Result<DataView> {
160        // First process CTEs to build context
161        let mut cte_context = HashMap::new();
162        for cte in &statement.ctes {
163            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164            // Execute the CTE based on its type
165            let cte_result = match &cte.cte_type {
166                CTEType::Standard(query) => {
167                    // Execute the CTE query (it might reference earlier CTEs)
168                    let view = self.build_view_with_context(
169                        table.clone(),
170                        query.clone(),
171                        &mut cte_context,
172                    )?;
173
174                    // Materialize the view and enrich columns with qualified names
175                    let mut materialized = self.materialize_view(view)?;
176
177                    // Enrich columns with qualified names for proper scoping
178                    for column in materialized.columns_mut() {
179                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180                        column.source_table = Some(cte.name.clone());
181                    }
182
183                    DataView::new(Arc::new(materialized))
184                }
185                CTEType::Web(web_spec) => {
186                    // Fetch data from URL
187                    use crate::web::http_fetcher::WebDataFetcher;
188
189                    let fetcher = WebDataFetcher::new()?;
190                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192                    // Enrich columns with qualified names for proper scoping
193                    for column in data_table.columns_mut() {
194                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195                        column.source_table = Some(cte.name.clone());
196                    }
197
198                    // Convert DataTable to DataView
199                    DataView::new(Arc::new(data_table))
200                }
201            };
202            // Store the result in the context for later use
203            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204            debug!(
205                "QueryEngine: CTE '{}' pre-processed, stored in context",
206                cte.name
207            );
208        }
209
210        // Now process subqueries with CTE context available
211        let mut subquery_executor =
212            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215        // Build the view with the same CTE context
216        self.build_view_with_context(table, processed_statement, &mut cte_context)
217    }
218
219    /// Execute a statement with provided CTE context (for subqueries)
220    pub fn execute_statement_with_cte_context(
221        &self,
222        table: Arc<DataTable>,
223        statement: SelectStatement,
224        cte_context: &HashMap<String, Arc<DataView>>,
225    ) -> Result<DataView> {
226        // Clone the context so we can add any CTEs from this statement
227        let mut local_context = cte_context.clone();
228
229        // Process any CTEs in this statement (they might be nested)
230        for cte in &statement.ctes {
231            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232            let cte_result = match &cte.cte_type {
233                CTEType::Standard(query) => {
234                    let view = self.build_view_with_context(
235                        table.clone(),
236                        query.clone(),
237                        &mut local_context,
238                    )?;
239
240                    // Materialize the view and enrich columns with qualified names
241                    let mut materialized = self.materialize_view(view)?;
242
243                    // Enrich columns with qualified names for proper scoping
244                    for column in materialized.columns_mut() {
245                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246                        column.source_table = Some(cte.name.clone());
247                    }
248
249                    DataView::new(Arc::new(materialized))
250                }
251                CTEType::Web(web_spec) => {
252                    // Fetch data from URL
253                    use crate::web::http_fetcher::WebDataFetcher;
254
255                    let fetcher = WebDataFetcher::new()?;
256                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258                    // Enrich columns with qualified names for proper scoping
259                    for column in data_table.columns_mut() {
260                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261                        column.source_table = Some(cte.name.clone());
262                    }
263
264                    // Convert DataTable to DataView
265                    DataView::new(Arc::new(data_table))
266                }
267            };
268            local_context.insert(cte.name.clone(), Arc::new(cte_result));
269        }
270
271        // Process subqueries with the complete context
272        let mut subquery_executor =
273            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276        // Build the view
277        self.build_view_with_context(table, processed_statement, &mut local_context)
278    }
279
280    /// Execute a query and return both the result and the execution plan
281    pub fn execute_with_plan(
282        &self,
283        table: Arc<DataTable>,
284        sql: &str,
285    ) -> Result<(DataView, ExecutionPlan)> {
286        let mut plan_builder = ExecutionPlanBuilder::new();
287        let start_time = Instant::now();
288
289        // Parse the SQL query
290        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291        plan_builder.add_detail(format!("Query: {}", sql));
292        let mut parser = Parser::new(sql);
293        let statement = parser
294            .parse()
295            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296        plan_builder.add_detail(format!("Parsed successfully"));
297        if let Some(from) = &statement.from_table {
298            plan_builder.add_detail(format!("FROM: {}", from));
299        }
300        if statement.where_clause.is_some() {
301            plan_builder.add_detail("WHERE clause present".to_string());
302        }
303        plan_builder.end_step();
304
305        // First process CTEs to build context
306        let mut cte_context = HashMap::new();
307
308        if !statement.ctes.is_empty() {
309            plan_builder.begin_step(
310                StepType::CTE,
311                format!("Process {} CTEs", statement.ctes.len()),
312            );
313
314            for cte in &statement.ctes {
315                let cte_start = Instant::now();
316                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318                let cte_result = match &cte.cte_type {
319                    CTEType::Standard(query) => {
320                        // Add CTE query details
321                        if let Some(from) = &query.from_table {
322                            plan_builder.add_detail(format!("Source: {}", from));
323                        }
324                        if query.where_clause.is_some() {
325                            plan_builder.add_detail("Has WHERE clause".to_string());
326                        }
327                        if query.group_by.is_some() {
328                            plan_builder.add_detail("Has GROUP BY".to_string());
329                        }
330
331                        debug!(
332                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333                            cte.name,
334                            cte_context.keys().collect::<Vec<_>>()
335                        );
336
337                        // Process subqueries in the CTE's query FIRST
338                        // This allows the subqueries to see all previously defined CTEs
339                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
340                            self.clone(),
341                            table.clone(),
342                            cte_context.clone(),
343                        );
344                        let processed_query = subquery_executor.execute_subqueries(query)?;
345
346                        let view = self.build_view_with_context(
347                            table.clone(),
348                            processed_query,
349                            &mut cte_context,
350                        )?;
351
352                        // Materialize the view and enrich columns with qualified names
353                        let mut materialized = self.materialize_view(view)?;
354
355                        // Enrich columns with qualified names for proper scoping
356                        for column in materialized.columns_mut() {
357                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358                            column.source_table = Some(cte.name.clone());
359                        }
360
361                        DataView::new(Arc::new(materialized))
362                    }
363                    CTEType::Web(web_spec) => {
364                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
365                        if let Some(format) = &web_spec.format {
366                            plan_builder.add_detail(format!("Format: {:?}", format));
367                        }
368                        if let Some(cache) = web_spec.cache_seconds {
369                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
370                        }
371
372                        // Fetch data from URL
373                        use crate::web::http_fetcher::WebDataFetcher;
374
375                        let fetcher = WebDataFetcher::new()?;
376                        let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378                        // Enrich columns with qualified names for proper scoping
379                        for column in data_table.columns_mut() {
380                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381                            column.source_table = Some(cte.name.clone());
382                        }
383
384                        // Convert DataTable to DataView
385                        DataView::new(Arc::new(data_table))
386                    }
387                };
388
389                // Record CTE statistics
390                plan_builder.set_rows_out(cte_result.row_count());
391                plan_builder.add_detail(format!(
392                    "Result: {} rows, {} columns",
393                    cte_result.row_count(),
394                    cte_result.column_count()
395                ));
396                plan_builder.add_detail(format!(
397                    "Execution time: {:.3}ms",
398                    cte_start.elapsed().as_secs_f64() * 1000.0
399                ));
400
401                debug!(
402                    "QueryEngine: Storing CTE '{}' in context with {} rows",
403                    cte.name,
404                    cte_result.row_count()
405                );
406                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407                plan_builder.end_step();
408            }
409
410            plan_builder.add_detail(format!(
411                "All {} CTEs cached in context",
412                statement.ctes.len()
413            ));
414            plan_builder.end_step();
415        }
416
417        // Process subqueries in the statement with CTE context
418        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419        let mut subquery_executor =
420            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422        // Check if there are subqueries to process
423        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424            // This is a simplified check - in reality we'd need to walk the AST
425            format!("{:?}", w).contains("Subquery")
426        });
427
428        if has_subqueries {
429            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430        }
431
432        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434        if has_subqueries {
435            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436        } else {
437            plan_builder.add_detail("No subqueries to process".to_string());
438        }
439
440        plan_builder.end_step();
441        let result = self.build_view_with_context_and_plan(
442            table,
443            processed_statement,
444            &mut cte_context,
445            &mut plan_builder,
446        )?;
447
448        let total_duration = start_time.elapsed();
449        info!(
450            "Query execution complete: total={:?}, rows={}",
451            total_duration,
452            result.row_count()
453        );
454
455        let plan = plan_builder.build();
456        Ok((result, plan))
457    }
458
459    /// Build a `DataView` from a parsed SQL statement
460    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461        let mut cte_context = HashMap::new();
462        self.build_view_with_context(table, statement, &mut cte_context)
463    }
464
465    /// Build a DataView from a SelectStatement with CTE context
466    fn build_view_with_context(
467        &self,
468        table: Arc<DataTable>,
469        statement: SelectStatement,
470        cte_context: &mut HashMap<String, Arc<DataView>>,
471    ) -> Result<DataView> {
472        let mut dummy_plan = ExecutionPlanBuilder::new();
473        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474    }
475
476    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
477    fn build_view_with_context_and_plan(
478        &self,
479        table: Arc<DataTable>,
480        statement: SelectStatement,
481        cte_context: &mut HashMap<String, Arc<DataView>>,
482        plan: &mut ExecutionPlanBuilder,
483    ) -> Result<DataView> {
484        // First, process any CTEs that aren't already in the context
485        for cte in &statement.ctes {
486            // Skip if already processed (e.g., by execute_select for WEB CTEs)
487            if cte_context.contains_key(&cte.name) {
488                debug!(
489                    "QueryEngine: CTE '{}' already in context, skipping",
490                    cte.name
491                );
492                continue;
493            }
494
495            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496            debug!(
497                "QueryEngine: Available CTEs for '{}': {:?}",
498                cte.name,
499                cte_context.keys().collect::<Vec<_>>()
500            );
501
502            // Execute the CTE query (it might reference earlier CTEs)
503            let cte_result = match &cte.cte_type {
504                CTEType::Standard(query) => {
505                    let view =
506                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508                    // Materialize the view and enrich columns with qualified names
509                    let mut materialized = self.materialize_view(view)?;
510
511                    // Enrich columns with qualified names for proper scoping
512                    for column in materialized.columns_mut() {
513                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514                        column.source_table = Some(cte.name.clone());
515                    }
516
517                    DataView::new(Arc::new(materialized))
518                }
519                CTEType::Web(_web_spec) => {
520                    // Web CTEs should have been processed earlier in execute_select
521                    return Err(anyhow!(
522                        "Web CTEs should be processed in execute_select method"
523                    ));
524                }
525            };
526
527            // Store the result in the context for later use
528            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529            debug!(
530                "QueryEngine: CTE '{}' processed, stored in context",
531                cte.name
532            );
533        }
534
535        // Determine the source table for the main query
536        let source_table = if let Some(ref table_func) = statement.from_function {
537            // Handle table functions like RANGE()
538            debug!("QueryEngine: Processing table function...");
539            match table_func {
540                TableFunction::Generator { name, args } => {
541                    // Use the generator registry to create the table
542                    use crate::sql::generators::GeneratorRegistry;
543
544                    // Create generator registry (could be cached in QueryEngine)
545                    let registry = GeneratorRegistry::new();
546
547                    if let Some(generator) = registry.get(name) {
548                        // Evaluate arguments
549                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
550                            &table,
551                            self.date_notation.clone(),
552                        );
553                        let dummy_row = 0;
554
555                        let mut evaluated_args = Vec::new();
556                        for arg in args {
557                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
558                        }
559
560                        // Generate the table
561                        generator.generate(evaluated_args)?
562                    } else {
563                        return Err(anyhow!("Unknown generator function: {}", name));
564                    }
565                }
566            }
567        } else if let Some(ref subquery) = statement.from_subquery {
568            // Execute the subquery and use its result as the source
569            debug!("QueryEngine: Processing FROM subquery...");
570            let subquery_result =
571                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
572
573            // Convert the DataView to a DataTable for use as source
574            // This materializes the subquery result
575            let materialized = self.materialize_view(subquery_result)?;
576            Arc::new(materialized)
577        } else if let Some(ref table_name) = statement.from_table {
578            // Check if this references a CTE
579            if let Some(cte_view) = cte_context.get(table_name) {
580                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
581                // Materialize the CTE view as a table
582                let mut materialized = self.materialize_view((**cte_view).clone())?;
583
584                // Apply alias to qualified column names if present
585                if let Some(ref alias) = statement.from_alias {
586                    debug!(
587                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
588                        alias, table_name
589                    );
590                    for column in materialized.columns_mut() {
591                        // Replace the CTE name with the alias in qualified names
592                        if let Some(ref qualified_name) = column.qualified_name {
593                            if qualified_name.starts_with(&format!("{}.", table_name)) {
594                                column.qualified_name =
595                                    Some(qualified_name.replace(
596                                        &format!("{}.", table_name),
597                                        &format!("{}.", alias),
598                                    ));
599                            }
600                        }
601                        // Update source table to reflect the alias
602                        if column.source_table.as_ref() == Some(table_name) {
603                            column.source_table = Some(alias.clone());
604                        }
605                    }
606                }
607
608                Arc::new(materialized)
609            } else {
610                // Regular table reference - use the provided table
611                table.clone()
612            }
613        } else {
614            // No FROM clause - use the provided table
615            table.clone()
616        };
617
618        // Process JOINs if present
619        let final_table = if !statement.joins.is_empty() {
620            plan.begin_step(
621                StepType::Join,
622                format!("Process {} JOINs", statement.joins.len()),
623            );
624            plan.set_rows_in(source_table.row_count());
625
626            let join_executor = HashJoinExecutor::new(self.case_insensitive);
627            let mut current_table = source_table;
628
629            for (idx, join_clause) in statement.joins.iter().enumerate() {
630                let join_start = Instant::now();
631                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
632                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
633                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
634                plan.add_detail(format!(
635                    "Executing {:?} JOIN on {} condition(s)",
636                    join_clause.join_type,
637                    join_clause.condition.conditions.len()
638                ));
639
640                // Resolve the right table for the join
641                let right_table = match &join_clause.table {
642                    TableSource::Table(name) => {
643                        // Check if it's a CTE reference
644                        if let Some(cte_view) = cte_context.get(name) {
645                            let mut materialized = self.materialize_view((**cte_view).clone())?;
646
647                            // Apply alias to qualified column names if present
648                            if let Some(ref alias) = join_clause.alias {
649                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
650                                for column in materialized.columns_mut() {
651                                    // Replace the CTE name with the alias in qualified names
652                                    if let Some(ref qualified_name) = column.qualified_name {
653                                        if qualified_name.starts_with(&format!("{}.", name)) {
654                                            column.qualified_name = Some(qualified_name.replace(
655                                                &format!("{}.", name),
656                                                &format!("{}.", alias),
657                                            ));
658                                        }
659                                    }
660                                    // Update source table to reflect the alias
661                                    if column.source_table.as_ref() == Some(name) {
662                                        column.source_table = Some(alias.clone());
663                                    }
664                                }
665                            }
666
667                            Arc::new(materialized)
668                        } else {
669                            // For now, we need the actual table data
670                            // In a real implementation, this would load from file
671                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
672                        }
673                    }
674                    TableSource::DerivedTable { query, alias: _ } => {
675                        // Execute the subquery
676                        let subquery_result = self.build_view_with_context(
677                            table.clone(),
678                            *query.clone(),
679                            cte_context,
680                        )?;
681                        let materialized = self.materialize_view(subquery_result)?;
682                        Arc::new(materialized)
683                    }
684                };
685
686                // Execute the join
687                let joined = join_executor.execute_join(
688                    current_table.clone(),
689                    join_clause,
690                    right_table.clone(),
691                )?;
692
693                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
694                plan.set_rows_out(joined.row_count());
695                plan.add_detail(format!("Result: {} rows", joined.row_count()));
696                plan.add_detail(format!(
697                    "Join time: {:.3}ms",
698                    join_start.elapsed().as_secs_f64() * 1000.0
699                ));
700                plan.end_step();
701
702                current_table = Arc::new(joined);
703            }
704
705            plan.set_rows_out(current_table.row_count());
706            plan.add_detail(format!(
707                "Final result after all joins: {} rows",
708                current_table.row_count()
709            ));
710            plan.end_step();
711            current_table
712        } else {
713            source_table
714        };
715
716        // Continue with the existing build_view logic but using final_table
717        self.build_view_internal_with_plan(final_table, statement, plan)
718    }
719
720    /// Materialize a DataView into a new DataTable
721    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
722        let source = view.source();
723        let mut result_table = DataTable::new("derived");
724
725        // Get the visible columns from the view
726        let visible_cols = view.visible_column_indices().to_vec();
727
728        // Copy column definitions
729        for col_idx in &visible_cols {
730            let col = &source.columns[*col_idx];
731            let new_col = DataColumn {
732                name: col.name.clone(),
733                data_type: col.data_type.clone(),
734                nullable: col.nullable,
735                unique_values: col.unique_values,
736                null_count: col.null_count,
737                metadata: col.metadata.clone(),
738                qualified_name: col.qualified_name.clone(), // Preserve qualified name
739                source_table: col.source_table.clone(),     // Preserve source table
740            };
741            result_table.add_column(new_col);
742        }
743
744        // Copy visible rows
745        for row_idx in view.visible_row_indices() {
746            let source_row = &source.rows[*row_idx];
747            let mut new_row = DataRow { values: Vec::new() };
748
749            for col_idx in &visible_cols {
750                new_row.values.push(source_row.values[*col_idx].clone());
751            }
752
753            result_table.add_row(new_row);
754        }
755
756        Ok(result_table)
757    }
758
759    fn build_view_internal(
760        &self,
761        table: Arc<DataTable>,
762        statement: SelectStatement,
763    ) -> Result<DataView> {
764        let mut dummy_plan = ExecutionPlanBuilder::new();
765        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
766    }
767
768    fn build_view_internal_with_plan(
769        &self,
770        table: Arc<DataTable>,
771        statement: SelectStatement,
772        plan: &mut ExecutionPlanBuilder,
773    ) -> Result<DataView> {
774        debug!(
775            "QueryEngine::build_view - select_items: {:?}",
776            statement.select_items
777        );
778        debug!(
779            "QueryEngine::build_view - where_clause: {:?}",
780            statement.where_clause
781        );
782
783        // Start with all rows visible
784        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
785
786        // Apply WHERE clause filtering using recursive evaluator
787        if let Some(where_clause) = &statement.where_clause {
788            let total_rows = table.row_count();
789            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
790            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
791
792            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
793            plan.set_rows_in(total_rows);
794            plan.add_detail(format!("Input: {} rows", total_rows));
795
796            // Add details about WHERE conditions
797            for condition in &where_clause.conditions {
798                plan.add_detail(format!("Condition: {:?}", condition.expr));
799            }
800
801            let filter_start = Instant::now();
802            // Create an evaluation context for caching compiled regexes
803            let mut eval_context = EvaluationContext::new(self.case_insensitive);
804
805            // Filter visible rows based on WHERE clause
806            let mut filtered_rows = Vec::new();
807            for row_idx in visible_rows {
808                // Only log for first few rows to avoid performance impact
809                if row_idx < 3 {
810                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
811                }
812                let mut evaluator =
813                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
814                match evaluator.evaluate(where_clause, row_idx) {
815                    Ok(result) => {
816                        if row_idx < 3 {
817                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
818                        }
819                        if result {
820                            filtered_rows.push(row_idx);
821                        }
822                    }
823                    Err(e) => {
824                        if row_idx < 3 {
825                            debug!(
826                                "QueryEngine: WHERE evaluation error for row {}: {}",
827                                row_idx, e
828                            );
829                        }
830                        // Propagate WHERE clause errors instead of silently ignoring them
831                        return Err(e);
832                    }
833                }
834            }
835
836            // Log regex cache statistics
837            let (compilations, cache_hits) = eval_context.get_stats();
838            if compilations > 0 || cache_hits > 0 {
839                debug!(
840                    "LIKE pattern cache: {} compilations, {} cache hits",
841                    compilations, cache_hits
842                );
843            }
844            visible_rows = filtered_rows;
845            let filter_duration = filter_start.elapsed();
846            info!(
847                "WHERE clause filtering: {} rows -> {} rows in {:?}",
848                total_rows,
849                visible_rows.len(),
850                filter_duration
851            );
852
853            plan.set_rows_out(visible_rows.len());
854            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
855            plan.add_detail(format!(
856                "Filter time: {:.3}ms",
857                filter_duration.as_secs_f64() * 1000.0
858            ));
859            plan.end_step();
860        }
861
862        // Create initial DataView with filtered rows
863        let mut view = DataView::new(table.clone());
864        view = view.with_rows(visible_rows);
865
866        // Handle GROUP BY if present
867        if let Some(group_by_exprs) = &statement.group_by {
868            if !group_by_exprs.is_empty() {
869                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
870
871                plan.begin_step(
872                    StepType::GroupBy,
873                    format!("GROUP BY {} expressions", group_by_exprs.len()),
874                );
875                plan.set_rows_in(view.row_count());
876                plan.add_detail(format!("Input: {} rows", view.row_count()));
877                for expr in group_by_exprs {
878                    plan.add_detail(format!("Group by: {:?}", expr));
879                }
880
881                let group_start = Instant::now();
882                view = self.apply_group_by(
883                    view,
884                    group_by_exprs,
885                    &statement.select_items,
886                    statement.having.as_ref(),
887                    plan,
888                )?;
889
890                plan.set_rows_out(view.row_count());
891                plan.add_detail(format!("Output: {} groups", view.row_count()));
892                plan.add_detail(format!(
893                    "Overall time: {:.3}ms",
894                    group_start.elapsed().as_secs_f64() * 1000.0
895                ));
896                plan.end_step();
897            }
898        } else {
899            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
900            if !statement.select_items.is_empty() {
901                // Check if we have ANY non-star items (not just the first one)
902                let has_non_star_items = statement
903                    .select_items
904                    .iter()
905                    .any(|item| !matches!(item, SelectItem::Star));
906
907                // Apply select items if:
908                // 1. We have computed expressions or explicit columns
909                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
910                if has_non_star_items || statement.select_items.len() > 1 {
911                    view = self.apply_select_items(view, &statement.select_items)?;
912                } else {
913                }
914                // If it's just a single star, no projection needed
915            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
916                debug!("QueryEngine: Using legacy columns path");
917                // Fallback to legacy column projection for backward compatibility
918                // Use the current view's source table, not the original table
919                let source_table = view.source();
920                let column_indices =
921                    self.resolve_column_indices(source_table, &statement.columns)?;
922                view = view.with_columns(column_indices);
923            }
924        }
925
926        // Apply DISTINCT if specified
927        if statement.distinct {
928            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
929            plan.set_rows_in(view.row_count());
930            plan.add_detail(format!("Input: {} rows", view.row_count()));
931
932            let distinct_start = Instant::now();
933            view = self.apply_distinct(view)?;
934
935            plan.set_rows_out(view.row_count());
936            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
937            plan.add_detail(format!(
938                "Distinct time: {:.3}ms",
939                distinct_start.elapsed().as_secs_f64() * 1000.0
940            ));
941            plan.end_step();
942        }
943
944        // Apply ORDER BY sorting
945        if let Some(order_by_columns) = &statement.order_by {
946            if !order_by_columns.is_empty() {
947                plan.begin_step(
948                    StepType::Sort,
949                    format!("ORDER BY {} columns", order_by_columns.len()),
950                );
951                plan.set_rows_in(view.row_count());
952                for col in order_by_columns {
953                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
954                }
955
956                let sort_start = Instant::now();
957                view = self.apply_multi_order_by(view, order_by_columns)?;
958
959                plan.add_detail(format!(
960                    "Sort time: {:.3}ms",
961                    sort_start.elapsed().as_secs_f64() * 1000.0
962                ));
963                plan.end_step();
964            }
965        }
966
967        // Apply LIMIT/OFFSET
968        if let Some(limit) = statement.limit {
969            let offset = statement.offset.unwrap_or(0);
970            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
971            plan.set_rows_in(view.row_count());
972            if offset > 0 {
973                plan.add_detail(format!("OFFSET: {}", offset));
974            }
975            view = view.with_limit(limit, offset);
976            plan.set_rows_out(view.row_count());
977            plan.add_detail(format!("Output: {} rows", view.row_count()));
978            plan.end_step();
979        }
980
981        Ok(view)
982    }
983
984    /// Resolve column names to indices
985    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
986        let mut indices = Vec::new();
987        let table_columns = table.column_names();
988
989        for col_name in columns {
990            let index = table_columns
991                .iter()
992                .position(|c| c.eq_ignore_ascii_case(col_name))
993                .ok_or_else(|| {
994                    let suggestion = self.find_similar_column(table, col_name);
995                    match suggestion {
996                        Some(similar) => anyhow::anyhow!(
997                            "Column '{}' not found. Did you mean '{}'?",
998                            col_name,
999                            similar
1000                        ),
1001                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1002                    }
1003                })?;
1004            indices.push(index);
1005        }
1006
1007        Ok(indices)
1008    }
1009
1010    /// Apply SELECT items (columns and computed expressions) to create new view
1011    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1012        debug!(
1013            "QueryEngine::apply_select_items - items: {:?}",
1014            select_items
1015        );
1016        debug!(
1017            "QueryEngine::apply_select_items - input view has {} rows",
1018            view.row_count()
1019        );
1020
1021        // Check if this is an aggregate query:
1022        // 1. At least one aggregate function exists
1023        // 2. All other items are either aggregates or constants (aggregate-compatible)
1024        let has_aggregates = select_items.iter().any(|item| match item {
1025            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1026            SelectItem::Column(_) => false,
1027            SelectItem::Star => false,
1028        });
1029
1030        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1031            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1032            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
1033            SelectItem::Star => false,      // Star is not aggregate-compatible
1034        });
1035
1036        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1037            // Special handling for aggregate queries with constants (no GROUP BY)
1038            // These should produce exactly one row
1039            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1040            return self.apply_aggregate_select(view, select_items);
1041        }
1042
1043        // Check if we need to create computed columns
1044        let has_computed_expressions = select_items
1045            .iter()
1046            .any(|item| matches!(item, SelectItem::Expression { .. }));
1047
1048        debug!(
1049            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1050            has_computed_expressions
1051        );
1052
1053        if !has_computed_expressions {
1054            // Simple case: only columns, use existing projection logic
1055            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1056            return Ok(view.with_columns(column_indices));
1057        }
1058
1059        // Complex case: we have computed expressions
1060        // IMPORTANT: We create a PROJECTED view, not a new table
1061        // This preserves the original DataTable reference
1062
1063        let source_table = view.source();
1064        let visible_rows = view.visible_row_indices();
1065
1066        // Create a temporary table just for the computed result view
1067        // But this table is only used for the current query result
1068        let mut computed_table = DataTable::new("query_result");
1069
1070        // First, expand any Star selectors to actual columns
1071        let mut expanded_items = Vec::new();
1072        for item in select_items {
1073            match item {
1074                SelectItem::Star => {
1075                    // Expand * to all columns from source table
1076                    for col_name in source_table.column_names() {
1077                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1078                            col_name.to_string(),
1079                        )));
1080                    }
1081                }
1082                _ => expanded_items.push(item.clone()),
1083            }
1084        }
1085
1086        // Add columns based on expanded SelectItems, handling duplicates
1087        let mut column_name_counts: std::collections::HashMap<String, usize> =
1088            std::collections::HashMap::new();
1089
1090        for item in &expanded_items {
1091            let base_name = match item {
1092                SelectItem::Column(col_ref) => col_ref.name.clone(),
1093                SelectItem::Expression { alias, .. } => alias.clone(),
1094                SelectItem::Star => unreachable!("Star should have been expanded"),
1095            };
1096
1097            // Check if this column name has been used before
1098            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1099            let column_name = if *count == 0 {
1100                // First occurrence, use the name as-is
1101                base_name.clone()
1102            } else {
1103                // Duplicate, append a suffix
1104                format!("{base_name}_{count}")
1105            };
1106            *count += 1;
1107
1108            computed_table.add_column(DataColumn::new(&column_name));
1109        }
1110
1111        // Calculate values for each row
1112        let mut evaluator =
1113            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1114
1115        for &row_idx in visible_rows {
1116            let mut row_values = Vec::new();
1117
1118            for item in &expanded_items {
1119                let value = match item {
1120                    SelectItem::Column(col_ref) => {
1121                        // Column reference with optional table prefix
1122                        let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1123                            // For qualified references, ONLY try qualified lookup - no fallback
1124                            let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1125                            let result = source_table.find_column_by_qualified_name(&qualified_name);
1126
1127                            // Debug: log what qualified names are available when lookup fails
1128                            if result.is_none() {
1129                                let available_qualified: Vec<String> = source_table.columns.iter()
1130                                    .filter_map(|c| c.qualified_name.clone())
1131                                    .collect();
1132                                let available_simple: Vec<String> = source_table.columns.iter()
1133                                    .map(|c| c.name.clone())
1134                                    .collect();
1135                                debug!(
1136                                    "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1137                                    qualified_name, available_qualified, available_simple
1138                                );
1139                                // This MUST return None to trigger error
1140                            }
1141                            result
1142                        } else {
1143                            // Simple column name lookup
1144                            source_table.get_column_index(&col_ref.name)
1145                        }
1146                        .ok_or_else(|| {
1147                            let display_name = if let Some(prefix) = &col_ref.table_prefix {
1148                                format!("{}.{}", prefix, col_ref.name)
1149                            } else {
1150                                col_ref.name.clone()
1151                            };
1152                            let suggestion = self.find_similar_column(source_table, &col_ref.name);
1153                            match suggestion {
1154                                Some(similar) => anyhow::anyhow!(
1155                                    "Column '{}' not found. Did you mean '{}'?",
1156                                    display_name,
1157                                    similar
1158                                ),
1159                                None => anyhow::anyhow!("Column '{}' not found", display_name),
1160                            }
1161                        })?;
1162                        let row = source_table
1163                            .get_row(row_idx)
1164                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1165                        row.get(col_idx)
1166                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1167                            .clone()
1168                    }
1169                    SelectItem::Expression { expr, .. } => {
1170                        // Computed expression
1171                        evaluator.evaluate(expr, row_idx)?
1172                    }
1173                    SelectItem::Star => unreachable!("Star should have been expanded"),
1174                };
1175                row_values.push(value);
1176            }
1177
1178            computed_table
1179                .add_row(DataRow::new(row_values))
1180                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1181        }
1182
1183        // Return a view of the computed result
1184        // This is a temporary view for this query only
1185        Ok(DataView::new(Arc::new(computed_table)))
1186    }
1187
1188    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1189    fn apply_aggregate_select(
1190        &self,
1191        view: DataView,
1192        select_items: &[SelectItem],
1193    ) -> Result<DataView> {
1194        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1195
1196        let source_table = view.source();
1197        let mut result_table = DataTable::new("aggregate_result");
1198
1199        // Add columns for each select item
1200        for item in select_items {
1201            let column_name = match item {
1202                SelectItem::Expression { alias, .. } => alias.clone(),
1203                _ => unreachable!("Should only have expressions in aggregate-only query"),
1204            };
1205            result_table.add_column(DataColumn::new(&column_name));
1206        }
1207
1208        // Create evaluator with visible rows from the view (for filtered aggregates)
1209        let visible_rows = view.visible_row_indices().to_vec();
1210        let mut evaluator =
1211            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1212                .with_visible_rows(visible_rows);
1213
1214        // Evaluate each aggregate expression once (they handle all rows internally)
1215        let mut row_values = Vec::new();
1216        for item in select_items {
1217            match item {
1218                SelectItem::Expression { expr, .. } => {
1219                    // The evaluator will handle aggregates over all rows
1220                    // We pass row_index=0 but aggregates ignore it and process all rows
1221                    let value = evaluator.evaluate(expr, 0)?;
1222                    row_values.push(value);
1223                }
1224                _ => unreachable!("Should only have expressions in aggregate-only query"),
1225            }
1226        }
1227
1228        // Add the single result row
1229        result_table
1230            .add_row(DataRow::new(row_values))
1231            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1232
1233        Ok(DataView::new(Arc::new(result_table)))
1234    }
1235
1236    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1237    fn resolve_select_columns(
1238        &self,
1239        table: &DataTable,
1240        select_items: &[SelectItem],
1241    ) -> Result<Vec<usize>> {
1242        let mut indices = Vec::new();
1243        let table_columns = table.column_names();
1244
1245        for item in select_items {
1246            match item {
1247                SelectItem::Column(col_ref) => {
1248                    // Check if this has a table prefix
1249                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1250                        // For qualified references, ONLY try qualified lookup - no fallback
1251                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1252                        table.find_column_by_qualified_name(&qualified_name)
1253                            .ok_or_else(|| {
1254                                // Check if any columns have qualified names for better error message
1255                                let has_qualified = table.columns.iter()
1256                                    .any(|c| c.qualified_name.is_some());
1257                                if !has_qualified {
1258                                    anyhow::anyhow!(
1259                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1260                                        qualified_name, table_prefix
1261                                    )
1262                                } else {
1263                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1264                                }
1265                            })?
1266                    } else {
1267                        // Simple column name lookup
1268                        table_columns
1269                            .iter()
1270                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1271                            .ok_or_else(|| {
1272                                let suggestion = self.find_similar_column(table, &col_ref.name);
1273                                match suggestion {
1274                                    Some(similar) => anyhow::anyhow!(
1275                                        "Column '{}' not found. Did you mean '{}'?",
1276                                        col_ref.name,
1277                                        similar
1278                                    ),
1279                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1280                                }
1281                            })?
1282                    };
1283                    indices.push(index);
1284                }
1285                SelectItem::Star => {
1286                    // Expand * to all column indices
1287                    for i in 0..table_columns.len() {
1288                        indices.push(i);
1289                    }
1290                }
1291                SelectItem::Expression { .. } => {
1292                    return Err(anyhow::anyhow!(
1293                        "Computed expressions require new table creation"
1294                    ));
1295                }
1296            }
1297        }
1298
1299        Ok(indices)
1300    }
1301
1302    /// Apply DISTINCT to remove duplicate rows
1303    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1304        use std::collections::HashSet;
1305
1306        let source = view.source();
1307        let visible_cols = view.visible_column_indices();
1308        let visible_rows = view.visible_row_indices();
1309
1310        // Build a set to track unique rows
1311        let mut seen_rows = HashSet::new();
1312        let mut unique_row_indices = Vec::new();
1313
1314        for &row_idx in visible_rows {
1315            // Build a key representing this row's visible column values
1316            let mut row_key = Vec::new();
1317            for &col_idx in visible_cols {
1318                let value = source
1319                    .get_value(row_idx, col_idx)
1320                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1321                // Convert value to a hashable representation
1322                row_key.push(format!("{:?}", value));
1323            }
1324
1325            // Check if we've seen this row before
1326            if seen_rows.insert(row_key) {
1327                // First time seeing this row combination
1328                unique_row_indices.push(row_idx);
1329            }
1330        }
1331
1332        // Create a new view with only unique rows
1333        Ok(view.with_rows(unique_row_indices))
1334    }
1335
1336    /// Apply multi-column ORDER BY sorting to the view
1337    fn apply_multi_order_by(
1338        &self,
1339        mut view: DataView,
1340        order_by_columns: &[OrderByColumn],
1341    ) -> Result<DataView> {
1342        // Build list of (source_column_index, ascending) tuples
1343        let mut sort_columns = Vec::new();
1344
1345        for order_col in order_by_columns {
1346            // Try to find the column index
1347            // Check in the current view's source table (this handles both regular columns and computed columns)
1348            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1349            let col_index = view
1350                .source()
1351                .get_column_index(&order_col.column)
1352                .ok_or_else(|| {
1353                    // If not found, provide helpful error with suggestions
1354                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1355                    match suggestion {
1356                        Some(similar) => anyhow::anyhow!(
1357                            "Column '{}' not found. Did you mean '{}'?",
1358                            order_col.column,
1359                            similar
1360                        ),
1361                        None => {
1362                            // Also list available columns for debugging
1363                            let available_cols = view.source().column_names().join(", ");
1364                            anyhow::anyhow!(
1365                                "Column '{}' not found. Available columns: {}",
1366                                order_col.column,
1367                                available_cols
1368                            )
1369                        }
1370                    }
1371                })?;
1372
1373            let ascending = matches!(order_col.direction, SortDirection::Asc);
1374            sort_columns.push((col_index, ascending));
1375        }
1376
1377        // Apply multi-column sorting
1378        view.apply_multi_sort(&sort_columns)?;
1379        Ok(view)
1380    }
1381
1382    /// Apply GROUP BY to the view with optional HAVING clause
1383    fn apply_group_by(
1384        &self,
1385        view: DataView,
1386        group_by_exprs: &[SqlExpression],
1387        select_items: &[SelectItem],
1388        having: Option<&SqlExpression>,
1389        plan: &mut ExecutionPlanBuilder,
1390    ) -> Result<DataView> {
1391        // Use the new expression-based GROUP BY implementation
1392        let (result_view, phase_info) = self.apply_group_by_expressions(
1393            view,
1394            group_by_exprs,
1395            select_items,
1396            having,
1397            self.case_insensitive,
1398            self.date_notation.clone(),
1399        )?;
1400
1401        // Add detailed phase information to the execution plan
1402        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1403        plan.add_detail(format!(
1404            "Phase 1 - Group Building: {:.3}ms",
1405            phase_info.phase2_key_building.as_secs_f64() * 1000.0
1406        ));
1407        plan.add_detail(format!(
1408            "  • Processing {} rows into {} groups",
1409            phase_info.total_rows, phase_info.num_groups
1410        ));
1411        plan.add_detail(format!(
1412            "Phase 2 - Aggregation: {:.3}ms",
1413            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
1414        ));
1415        if phase_info.phase4_having_evaluation > Duration::ZERO {
1416            plan.add_detail(format!(
1417                "Phase 3 - HAVING Filter: {:.3}ms",
1418                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
1419            ));
1420            plan.add_detail(format!(
1421                "  • Filtered {} groups",
1422                phase_info.groups_filtered_by_having
1423            ));
1424        }
1425        plan.add_detail(format!(
1426            "Total GROUP BY time: {:.3}ms",
1427            phase_info.total_time.as_secs_f64() * 1000.0
1428        ));
1429
1430        Ok(result_view)
1431    }
1432
1433    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1434    /// This helps pre-size hash tables for better performance
1435    pub fn estimate_group_cardinality(
1436        &self,
1437        view: &DataView,
1438        group_by_exprs: &[SqlExpression],
1439    ) -> usize {
1440        // If we have few rows, just return the row count as upper bound
1441        let row_count = view.get_visible_rows().len();
1442        if row_count <= 100 {
1443            return row_count;
1444        }
1445
1446        // Sample first 1000 rows or 10% of data, whichever is smaller
1447        let sample_size = min(1000, row_count / 10).max(100);
1448        let mut seen = FxHashSet::default();
1449
1450        let visible_rows = view.get_visible_rows();
1451        for (i, &row_idx) in visible_rows.iter().enumerate() {
1452            if i >= sample_size {
1453                break;
1454            }
1455
1456            // Evaluate GROUP BY expressions for this row
1457            let mut key_values = Vec::new();
1458            for expr in group_by_exprs {
1459                let mut evaluator = ArithmeticEvaluator::new(view.source());
1460                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1461                key_values.push(value);
1462            }
1463
1464            seen.insert(key_values);
1465        }
1466
1467        // Estimate total cardinality based on sample
1468        let sample_cardinality = seen.len();
1469        let estimated = (sample_cardinality * row_count) / sample_size;
1470
1471        // Cap at row count and ensure minimum of sample cardinality
1472        estimated.min(row_count).max(sample_cardinality)
1473    }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478    use super::*;
1479    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1480
1481    fn create_test_table() -> Arc<DataTable> {
1482        let mut table = DataTable::new("test");
1483
1484        // Add columns
1485        table.add_column(DataColumn::new("id"));
1486        table.add_column(DataColumn::new("name"));
1487        table.add_column(DataColumn::new("age"));
1488
1489        // Add rows
1490        table
1491            .add_row(DataRow::new(vec![
1492                DataValue::Integer(1),
1493                DataValue::String("Alice".to_string()),
1494                DataValue::Integer(30),
1495            ]))
1496            .unwrap();
1497
1498        table
1499            .add_row(DataRow::new(vec![
1500                DataValue::Integer(2),
1501                DataValue::String("Bob".to_string()),
1502                DataValue::Integer(25),
1503            ]))
1504            .unwrap();
1505
1506        table
1507            .add_row(DataRow::new(vec![
1508                DataValue::Integer(3),
1509                DataValue::String("Charlie".to_string()),
1510                DataValue::Integer(35),
1511            ]))
1512            .unwrap();
1513
1514        Arc::new(table)
1515    }
1516
1517    #[test]
1518    fn test_select_all() {
1519        let table = create_test_table();
1520        let engine = QueryEngine::new();
1521
1522        let view = engine
1523            .execute(table.clone(), "SELECT * FROM users")
1524            .unwrap();
1525        assert_eq!(view.row_count(), 3);
1526        assert_eq!(view.column_count(), 3);
1527    }
1528
1529    #[test]
1530    fn test_select_columns() {
1531        let table = create_test_table();
1532        let engine = QueryEngine::new();
1533
1534        let view = engine
1535            .execute(table.clone(), "SELECT name, age FROM users")
1536            .unwrap();
1537        assert_eq!(view.row_count(), 3);
1538        assert_eq!(view.column_count(), 2);
1539    }
1540
1541    #[test]
1542    fn test_select_with_limit() {
1543        let table = create_test_table();
1544        let engine = QueryEngine::new();
1545
1546        let view = engine
1547            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1548            .unwrap();
1549        assert_eq!(view.row_count(), 2);
1550    }
1551
1552    #[test]
1553    fn test_type_coercion_contains() {
1554        // Initialize tracing for debug output
1555        let _ = tracing_subscriber::fmt()
1556            .with_max_level(tracing::Level::DEBUG)
1557            .try_init();
1558
1559        let mut table = DataTable::new("test");
1560        table.add_column(DataColumn::new("id"));
1561        table.add_column(DataColumn::new("status"));
1562        table.add_column(DataColumn::new("price"));
1563
1564        // Add test data with mixed types
1565        table
1566            .add_row(DataRow::new(vec![
1567                DataValue::Integer(1),
1568                DataValue::String("Pending".to_string()),
1569                DataValue::Float(99.99),
1570            ]))
1571            .unwrap();
1572
1573        table
1574            .add_row(DataRow::new(vec![
1575                DataValue::Integer(2),
1576                DataValue::String("Confirmed".to_string()),
1577                DataValue::Float(150.50),
1578            ]))
1579            .unwrap();
1580
1581        table
1582            .add_row(DataRow::new(vec![
1583                DataValue::Integer(3),
1584                DataValue::String("Pending".to_string()),
1585                DataValue::Float(75.00),
1586            ]))
1587            .unwrap();
1588
1589        let table = Arc::new(table);
1590        let engine = QueryEngine::new();
1591
1592        println!("\n=== Testing WHERE clause with Contains ===");
1593        println!("Table has {} rows", table.row_count());
1594        for i in 0..table.row_count() {
1595            let status = table.get_value(i, 1);
1596            println!("Row {i}: status = {status:?}");
1597        }
1598
1599        // Test 1: Basic string contains (should work)
1600        println!("\n--- Test 1: status.Contains('pend') ---");
1601        let result = engine.execute(
1602            table.clone(),
1603            "SELECT * FROM test WHERE status.Contains('pend')",
1604        );
1605        match result {
1606            Ok(view) => {
1607                println!("SUCCESS: Found {} matching rows", view.row_count());
1608                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1609            }
1610            Err(e) => {
1611                panic!("Query failed: {e}");
1612            }
1613        }
1614
1615        // Test 2: Numeric contains (should work with type coercion)
1616        println!("\n--- Test 2: price.Contains('9') ---");
1617        let result = engine.execute(
1618            table.clone(),
1619            "SELECT * FROM test WHERE price.Contains('9')",
1620        );
1621        match result {
1622            Ok(view) => {
1623                println!(
1624                    "SUCCESS: Found {} matching rows with price containing '9'",
1625                    view.row_count()
1626                );
1627                // Should find 99.99 row
1628                assert!(view.row_count() >= 1);
1629            }
1630            Err(e) => {
1631                panic!("Numeric coercion query failed: {e}");
1632            }
1633        }
1634
1635        println!("\n=== All tests passed! ===");
1636    }
1637
1638    #[test]
1639    fn test_not_in_clause() {
1640        // Initialize tracing for debug output
1641        let _ = tracing_subscriber::fmt()
1642            .with_max_level(tracing::Level::DEBUG)
1643            .try_init();
1644
1645        let mut table = DataTable::new("test");
1646        table.add_column(DataColumn::new("id"));
1647        table.add_column(DataColumn::new("country"));
1648
1649        // Add test data
1650        table
1651            .add_row(DataRow::new(vec![
1652                DataValue::Integer(1),
1653                DataValue::String("CA".to_string()),
1654            ]))
1655            .unwrap();
1656
1657        table
1658            .add_row(DataRow::new(vec![
1659                DataValue::Integer(2),
1660                DataValue::String("US".to_string()),
1661            ]))
1662            .unwrap();
1663
1664        table
1665            .add_row(DataRow::new(vec![
1666                DataValue::Integer(3),
1667                DataValue::String("UK".to_string()),
1668            ]))
1669            .unwrap();
1670
1671        let table = Arc::new(table);
1672        let engine = QueryEngine::new();
1673
1674        println!("\n=== Testing NOT IN clause ===");
1675        println!("Table has {} rows", table.row_count());
1676        for i in 0..table.row_count() {
1677            let country = table.get_value(i, 1);
1678            println!("Row {i}: country = {country:?}");
1679        }
1680
1681        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1682        println!("\n--- Test: country NOT IN ('CA') ---");
1683        let result = engine.execute(
1684            table.clone(),
1685            "SELECT * FROM test WHERE country NOT IN ('CA')",
1686        );
1687        match result {
1688            Ok(view) => {
1689                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1690                assert_eq!(view.row_count(), 2); // Should find US and UK
1691            }
1692            Err(e) => {
1693                panic!("NOT IN query failed: {e}");
1694            }
1695        }
1696
1697        println!("\n=== NOT IN test complete! ===");
1698    }
1699
1700    #[test]
1701    fn test_case_insensitive_in_and_not_in() {
1702        // Initialize tracing for debug output
1703        let _ = tracing_subscriber::fmt()
1704            .with_max_level(tracing::Level::DEBUG)
1705            .try_init();
1706
1707        let mut table = DataTable::new("test");
1708        table.add_column(DataColumn::new("id"));
1709        table.add_column(DataColumn::new("country"));
1710
1711        // Add test data with mixed case
1712        table
1713            .add_row(DataRow::new(vec![
1714                DataValue::Integer(1),
1715                DataValue::String("CA".to_string()), // uppercase
1716            ]))
1717            .unwrap();
1718
1719        table
1720            .add_row(DataRow::new(vec![
1721                DataValue::Integer(2),
1722                DataValue::String("us".to_string()), // lowercase
1723            ]))
1724            .unwrap();
1725
1726        table
1727            .add_row(DataRow::new(vec![
1728                DataValue::Integer(3),
1729                DataValue::String("UK".to_string()), // uppercase
1730            ]))
1731            .unwrap();
1732
1733        let table = Arc::new(table);
1734
1735        println!("\n=== Testing Case-Insensitive IN clause ===");
1736        println!("Table has {} rows", table.row_count());
1737        for i in 0..table.row_count() {
1738            let country = table.get_value(i, 1);
1739            println!("Row {i}: country = {country:?}");
1740        }
1741
1742        // Test case-insensitive IN - should match 'CA' with 'ca'
1743        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1744        let engine = QueryEngine::with_case_insensitive(true);
1745        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1746        match result {
1747            Ok(view) => {
1748                println!(
1749                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1750                    view.row_count()
1751                );
1752                assert_eq!(view.row_count(), 1); // Should find CA row
1753            }
1754            Err(e) => {
1755                panic!("Case-insensitive IN query failed: {e}");
1756            }
1757        }
1758
1759        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
1760        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1761        let result = engine.execute(
1762            table.clone(),
1763            "SELECT * FROM test WHERE country NOT IN ('ca')",
1764        );
1765        match result {
1766            Ok(view) => {
1767                println!(
1768                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1769                    view.row_count()
1770                );
1771                assert_eq!(view.row_count(), 2); // Should find us and UK rows
1772            }
1773            Err(e) => {
1774                panic!("Case-insensitive NOT IN query failed: {e}");
1775            }
1776        }
1777
1778        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
1779        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1780        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
1781        let result = engine_case_sensitive
1782            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1783        match result {
1784            Ok(view) => {
1785                println!(
1786                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1787                    view.row_count()
1788                );
1789                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
1790            }
1791            Err(e) => {
1792                panic!("Case-sensitive IN query failed: {e}");
1793            }
1794        }
1795
1796        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1797    }
1798
1799    #[test]
1800    #[ignore = "Parentheses in WHERE clause not yet implemented"]
1801    fn test_parentheses_in_where_clause() {
1802        // Initialize tracing for debug output
1803        let _ = tracing_subscriber::fmt()
1804            .with_max_level(tracing::Level::DEBUG)
1805            .try_init();
1806
1807        let mut table = DataTable::new("test");
1808        table.add_column(DataColumn::new("id"));
1809        table.add_column(DataColumn::new("status"));
1810        table.add_column(DataColumn::new("priority"));
1811
1812        // Add test data
1813        table
1814            .add_row(DataRow::new(vec![
1815                DataValue::Integer(1),
1816                DataValue::String("Pending".to_string()),
1817                DataValue::String("High".to_string()),
1818            ]))
1819            .unwrap();
1820
1821        table
1822            .add_row(DataRow::new(vec![
1823                DataValue::Integer(2),
1824                DataValue::String("Complete".to_string()),
1825                DataValue::String("High".to_string()),
1826            ]))
1827            .unwrap();
1828
1829        table
1830            .add_row(DataRow::new(vec![
1831                DataValue::Integer(3),
1832                DataValue::String("Pending".to_string()),
1833                DataValue::String("Low".to_string()),
1834            ]))
1835            .unwrap();
1836
1837        table
1838            .add_row(DataRow::new(vec![
1839                DataValue::Integer(4),
1840                DataValue::String("Complete".to_string()),
1841                DataValue::String("Low".to_string()),
1842            ]))
1843            .unwrap();
1844
1845        let table = Arc::new(table);
1846        let engine = QueryEngine::new();
1847
1848        println!("\n=== Testing Parentheses in WHERE clause ===");
1849        println!("Table has {} rows", table.row_count());
1850        for i in 0..table.row_count() {
1851            let status = table.get_value(i, 1);
1852            let priority = table.get_value(i, 2);
1853            println!("Row {i}: status = {status:?}, priority = {priority:?}");
1854        }
1855
1856        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
1857        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1858        let result = engine.execute(
1859            table.clone(),
1860            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1861        );
1862        match result {
1863            Ok(view) => {
1864                println!(
1865                    "SUCCESS: Found {} rows with parenthetical logic",
1866                    view.row_count()
1867                );
1868                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
1869            }
1870            Err(e) => {
1871                panic!("Parentheses query failed: {e}");
1872            }
1873        }
1874
1875        println!("\n=== Parentheses test complete! ===");
1876    }
1877
1878    #[test]
1879    #[ignore = "Numeric type coercion needs fixing"]
1880    fn test_numeric_type_coercion() {
1881        // Initialize tracing for debug output
1882        let _ = tracing_subscriber::fmt()
1883            .with_max_level(tracing::Level::DEBUG)
1884            .try_init();
1885
1886        let mut table = DataTable::new("test");
1887        table.add_column(DataColumn::new("id"));
1888        table.add_column(DataColumn::new("price"));
1889        table.add_column(DataColumn::new("quantity"));
1890
1891        // Add test data with different numeric types
1892        table
1893            .add_row(DataRow::new(vec![
1894                DataValue::Integer(1),
1895                DataValue::Float(99.50), // Contains '.'
1896                DataValue::Integer(100),
1897            ]))
1898            .unwrap();
1899
1900        table
1901            .add_row(DataRow::new(vec![
1902                DataValue::Integer(2),
1903                DataValue::Float(150.0), // Contains '.' and '0'
1904                DataValue::Integer(200),
1905            ]))
1906            .unwrap();
1907
1908        table
1909            .add_row(DataRow::new(vec![
1910                DataValue::Integer(3),
1911                DataValue::Integer(75), // No decimal point
1912                DataValue::Integer(50),
1913            ]))
1914            .unwrap();
1915
1916        let table = Arc::new(table);
1917        let engine = QueryEngine::new();
1918
1919        println!("\n=== Testing Numeric Type Coercion ===");
1920        println!("Table has {} rows", table.row_count());
1921        for i in 0..table.row_count() {
1922            let price = table.get_value(i, 1);
1923            let quantity = table.get_value(i, 2);
1924            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1925        }
1926
1927        // Test Contains on float values - should find rows with decimal points
1928        println!("\n--- Test: price.Contains('.') ---");
1929        let result = engine.execute(
1930            table.clone(),
1931            "SELECT * FROM test WHERE price.Contains('.')",
1932        );
1933        match result {
1934            Ok(view) => {
1935                println!(
1936                    "SUCCESS: Found {} rows with decimal points in price",
1937                    view.row_count()
1938                );
1939                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
1940            }
1941            Err(e) => {
1942                panic!("Numeric Contains query failed: {e}");
1943            }
1944        }
1945
1946        // Test Contains on integer values converted to string
1947        println!("\n--- Test: quantity.Contains('0') ---");
1948        let result = engine.execute(
1949            table.clone(),
1950            "SELECT * FROM test WHERE quantity.Contains('0')",
1951        );
1952        match result {
1953            Ok(view) => {
1954                println!(
1955                    "SUCCESS: Found {} rows with '0' in quantity",
1956                    view.row_count()
1957                );
1958                assert_eq!(view.row_count(), 2); // Should find 100 and 200
1959            }
1960            Err(e) => {
1961                panic!("Integer Contains query failed: {e}");
1962            }
1963        }
1964
1965        println!("\n=== Numeric type coercion test complete! ===");
1966    }
1967
1968    #[test]
1969    fn test_datetime_comparisons() {
1970        // Initialize tracing for debug output
1971        let _ = tracing_subscriber::fmt()
1972            .with_max_level(tracing::Level::DEBUG)
1973            .try_init();
1974
1975        let mut table = DataTable::new("test");
1976        table.add_column(DataColumn::new("id"));
1977        table.add_column(DataColumn::new("created_date"));
1978
1979        // Add test data with date strings (as they would come from CSV)
1980        table
1981            .add_row(DataRow::new(vec![
1982                DataValue::Integer(1),
1983                DataValue::String("2024-12-15".to_string()),
1984            ]))
1985            .unwrap();
1986
1987        table
1988            .add_row(DataRow::new(vec![
1989                DataValue::Integer(2),
1990                DataValue::String("2025-01-15".to_string()),
1991            ]))
1992            .unwrap();
1993
1994        table
1995            .add_row(DataRow::new(vec![
1996                DataValue::Integer(3),
1997                DataValue::String("2025-02-15".to_string()),
1998            ]))
1999            .unwrap();
2000
2001        let table = Arc::new(table);
2002        let engine = QueryEngine::new();
2003
2004        println!("\n=== Testing DateTime Comparisons ===");
2005        println!("Table has {} rows", table.row_count());
2006        for i in 0..table.row_count() {
2007            let date = table.get_value(i, 1);
2008            println!("Row {i}: created_date = {date:?}");
2009        }
2010
2011        // Test DateTime constructor comparison - should find dates after 2025-01-01
2012        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2013        let result = engine.execute(
2014            table.clone(),
2015            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2016        );
2017        match result {
2018            Ok(view) => {
2019                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2020                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2021            }
2022            Err(e) => {
2023                panic!("DateTime comparison query failed: {e}");
2024            }
2025        }
2026
2027        println!("\n=== DateTime comparison test complete! ===");
2028    }
2029
2030    #[test]
2031    fn test_not_with_method_calls() {
2032        // Initialize tracing for debug output
2033        let _ = tracing_subscriber::fmt()
2034            .with_max_level(tracing::Level::DEBUG)
2035            .try_init();
2036
2037        let mut table = DataTable::new("test");
2038        table.add_column(DataColumn::new("id"));
2039        table.add_column(DataColumn::new("status"));
2040
2041        // Add test data
2042        table
2043            .add_row(DataRow::new(vec![
2044                DataValue::Integer(1),
2045                DataValue::String("Pending Review".to_string()),
2046            ]))
2047            .unwrap();
2048
2049        table
2050            .add_row(DataRow::new(vec![
2051                DataValue::Integer(2),
2052                DataValue::String("Complete".to_string()),
2053            ]))
2054            .unwrap();
2055
2056        table
2057            .add_row(DataRow::new(vec![
2058                DataValue::Integer(3),
2059                DataValue::String("Pending Approval".to_string()),
2060            ]))
2061            .unwrap();
2062
2063        let table = Arc::new(table);
2064        let engine = QueryEngine::with_case_insensitive(true);
2065
2066        println!("\n=== Testing NOT with Method Calls ===");
2067        println!("Table has {} rows", table.row_count());
2068        for i in 0..table.row_count() {
2069            let status = table.get_value(i, 1);
2070            println!("Row {i}: status = {status:?}");
2071        }
2072
2073        // Test NOT with Contains - should exclude rows containing "pend"
2074        println!("\n--- Test: NOT status.Contains('pend') ---");
2075        let result = engine.execute(
2076            table.clone(),
2077            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2078        );
2079        match result {
2080            Ok(view) => {
2081                println!(
2082                    "SUCCESS: Found {} rows NOT containing 'pend'",
2083                    view.row_count()
2084                );
2085                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2086            }
2087            Err(e) => {
2088                panic!("NOT Contains query failed: {e}");
2089            }
2090        }
2091
2092        // Test NOT with StartsWith
2093        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2094        let result = engine.execute(
2095            table.clone(),
2096            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2097        );
2098        match result {
2099            Ok(view) => {
2100                println!(
2101                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2102                    view.row_count()
2103                );
2104                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2105            }
2106            Err(e) => {
2107                panic!("NOT StartsWith query failed: {e}");
2108            }
2109        }
2110
2111        println!("\n=== NOT with method calls test complete! ===");
2112    }
2113
2114    #[test]
2115    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2116    fn test_complex_logical_expressions() {
2117        // Initialize tracing for debug output
2118        let _ = tracing_subscriber::fmt()
2119            .with_max_level(tracing::Level::DEBUG)
2120            .try_init();
2121
2122        let mut table = DataTable::new("test");
2123        table.add_column(DataColumn::new("id"));
2124        table.add_column(DataColumn::new("status"));
2125        table.add_column(DataColumn::new("priority"));
2126        table.add_column(DataColumn::new("assigned"));
2127
2128        // Add comprehensive test data
2129        table
2130            .add_row(DataRow::new(vec![
2131                DataValue::Integer(1),
2132                DataValue::String("Pending".to_string()),
2133                DataValue::String("High".to_string()),
2134                DataValue::String("John".to_string()),
2135            ]))
2136            .unwrap();
2137
2138        table
2139            .add_row(DataRow::new(vec![
2140                DataValue::Integer(2),
2141                DataValue::String("Complete".to_string()),
2142                DataValue::String("High".to_string()),
2143                DataValue::String("Jane".to_string()),
2144            ]))
2145            .unwrap();
2146
2147        table
2148            .add_row(DataRow::new(vec![
2149                DataValue::Integer(3),
2150                DataValue::String("Pending".to_string()),
2151                DataValue::String("Low".to_string()),
2152                DataValue::String("John".to_string()),
2153            ]))
2154            .unwrap();
2155
2156        table
2157            .add_row(DataRow::new(vec![
2158                DataValue::Integer(4),
2159                DataValue::String("In Progress".to_string()),
2160                DataValue::String("Medium".to_string()),
2161                DataValue::String("Jane".to_string()),
2162            ]))
2163            .unwrap();
2164
2165        let table = Arc::new(table);
2166        let engine = QueryEngine::new();
2167
2168        println!("\n=== Testing Complex Logical Expressions ===");
2169        println!("Table has {} rows", table.row_count());
2170        for i in 0..table.row_count() {
2171            let status = table.get_value(i, 1);
2172            let priority = table.get_value(i, 2);
2173            let assigned = table.get_value(i, 3);
2174            println!(
2175                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2176            );
2177        }
2178
2179        // Test complex AND/OR logic
2180        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2181        let result = engine.execute(
2182            table.clone(),
2183            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2184        );
2185        match result {
2186            Ok(view) => {
2187                println!(
2188                    "SUCCESS: Found {} rows with complex logic",
2189                    view.row_count()
2190                );
2191                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2192            }
2193            Err(e) => {
2194                panic!("Complex logic query failed: {e}");
2195            }
2196        }
2197
2198        // Test NOT with complex expressions
2199        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2200        let result = engine.execute(
2201            table.clone(),
2202            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2203        );
2204        match result {
2205            Ok(view) => {
2206                println!(
2207                    "SUCCESS: Found {} rows with NOT complex logic",
2208                    view.row_count()
2209                );
2210                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2211            }
2212            Err(e) => {
2213                panic!("NOT complex logic query failed: {e}");
2214            }
2215        }
2216
2217        println!("\n=== Complex logical expressions test complete! ===");
2218    }
2219
2220    #[test]
2221    fn test_mixed_data_types_and_edge_cases() {
2222        // Initialize tracing for debug output
2223        let _ = tracing_subscriber::fmt()
2224            .with_max_level(tracing::Level::DEBUG)
2225            .try_init();
2226
2227        let mut table = DataTable::new("test");
2228        table.add_column(DataColumn::new("id"));
2229        table.add_column(DataColumn::new("value"));
2230        table.add_column(DataColumn::new("nullable_field"));
2231
2232        // Add test data with mixed types and edge cases
2233        table
2234            .add_row(DataRow::new(vec![
2235                DataValue::Integer(1),
2236                DataValue::String("123.45".to_string()),
2237                DataValue::String("present".to_string()),
2238            ]))
2239            .unwrap();
2240
2241        table
2242            .add_row(DataRow::new(vec![
2243                DataValue::Integer(2),
2244                DataValue::Float(678.90),
2245                DataValue::Null,
2246            ]))
2247            .unwrap();
2248
2249        table
2250            .add_row(DataRow::new(vec![
2251                DataValue::Integer(3),
2252                DataValue::Boolean(true),
2253                DataValue::String("also present".to_string()),
2254            ]))
2255            .unwrap();
2256
2257        table
2258            .add_row(DataRow::new(vec![
2259                DataValue::Integer(4),
2260                DataValue::String("false".to_string()),
2261                DataValue::Null,
2262            ]))
2263            .unwrap();
2264
2265        let table = Arc::new(table);
2266        let engine = QueryEngine::new();
2267
2268        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2269        println!("Table has {} rows", table.row_count());
2270        for i in 0..table.row_count() {
2271            let value = table.get_value(i, 1);
2272            let nullable = table.get_value(i, 2);
2273            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2274        }
2275
2276        // Test type coercion with boolean Contains
2277        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2278        let result = engine.execute(
2279            table.clone(),
2280            "SELECT * FROM test WHERE value.Contains('true')",
2281        );
2282        match result {
2283            Ok(view) => {
2284                println!(
2285                    "SUCCESS: Found {} rows with boolean coercion",
2286                    view.row_count()
2287                );
2288                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2289            }
2290            Err(e) => {
2291                panic!("Boolean coercion query failed: {e}");
2292            }
2293        }
2294
2295        // Test multiple IN values with mixed types
2296        println!("\n--- Test: id IN (1, 3) ---");
2297        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2298        match result {
2299            Ok(view) => {
2300                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2301                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2302            }
2303            Err(e) => {
2304                panic!("Multiple IN values query failed: {e}");
2305            }
2306        }
2307
2308        println!("\n=== Mixed data types test complete! ===");
2309    }
2310
2311    /// Test that aggregate-only queries return exactly one row (regression test)
2312    #[test]
2313    fn test_aggregate_only_single_row() {
2314        let table = create_test_stock_data();
2315        let engine = QueryEngine::new();
2316
2317        // Test query with multiple aggregates - should return exactly 1 row
2318        let result = engine
2319            .execute(
2320                table.clone(),
2321                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2322            )
2323            .expect("Query should succeed");
2324
2325        assert_eq!(
2326            result.row_count(),
2327            1,
2328            "Aggregate-only query should return exactly 1 row"
2329        );
2330        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2331
2332        // Verify the actual values are correct
2333        let source = result.source();
2334        let row = source.get_row(0).expect("Should have first row");
2335
2336        // COUNT(*) should be 5 (total rows)
2337        assert_eq!(row.values[0], DataValue::Integer(5));
2338
2339        // MIN should be 99.5
2340        assert_eq!(row.values[1], DataValue::Float(99.5));
2341
2342        // MAX should be 105.0
2343        assert_eq!(row.values[2], DataValue::Float(105.0));
2344
2345        // AVG should be approximately 102.4
2346        if let DataValue::Float(avg) = &row.values[3] {
2347            assert!(
2348                (avg - 102.4).abs() < 0.01,
2349                "Average should be approximately 102.4, got {}",
2350                avg
2351            );
2352        } else {
2353            panic!("AVG should return a Float value");
2354        }
2355    }
2356
2357    /// Test single aggregate function returns single row
2358    #[test]
2359    fn test_single_aggregate_single_row() {
2360        let table = create_test_stock_data();
2361        let engine = QueryEngine::new();
2362
2363        let result = engine
2364            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2365            .expect("Query should succeed");
2366
2367        assert_eq!(
2368            result.row_count(),
2369            1,
2370            "Single aggregate query should return exactly 1 row"
2371        );
2372        assert_eq!(result.column_count(), 1, "Should have 1 column");
2373
2374        let source = result.source();
2375        let row = source.get_row(0).expect("Should have first row");
2376        assert_eq!(row.values[0], DataValue::Integer(5));
2377    }
2378
2379    /// Test aggregate with WHERE clause filtering
2380    #[test]
2381    fn test_aggregate_with_where_single_row() {
2382        let table = create_test_stock_data();
2383        let engine = QueryEngine::new();
2384
2385        // Filter to only high-value stocks (>= 103.0) and aggregate
2386        let result = engine
2387            .execute(
2388                table.clone(),
2389                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2390            )
2391            .expect("Query should succeed");
2392
2393        assert_eq!(
2394            result.row_count(),
2395            1,
2396            "Filtered aggregate query should return exactly 1 row"
2397        );
2398        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2399
2400        let source = result.source();
2401        let row = source.get_row(0).expect("Should have first row");
2402
2403        // Should find 2 rows (103.5 and 105.0)
2404        assert_eq!(row.values[0], DataValue::Integer(2));
2405        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2406        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2407    }
2408
2409    #[test]
2410    fn test_not_in_parsing() {
2411        use crate::sql::recursive_parser::Parser;
2412
2413        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2414        println!("\n=== Testing NOT IN parsing ===");
2415        println!("Parsing query: {query}");
2416
2417        let mut parser = Parser::new(query);
2418        match parser.parse() {
2419            Ok(statement) => {
2420                println!("Parsed statement: {statement:#?}");
2421                if let Some(where_clause) = statement.where_clause {
2422                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2423                    if let Some(first_condition) = where_clause.conditions.first() {
2424                        println!("First condition expression: {:#?}", first_condition.expr);
2425                    }
2426                }
2427            }
2428            Err(e) => {
2429                panic!("Parse error: {e}");
2430            }
2431        }
2432    }
2433
2434    /// Create test stock data for aggregate testing
2435    fn create_test_stock_data() -> Arc<DataTable> {
2436        let mut table = DataTable::new("stock");
2437
2438        table.add_column(DataColumn::new("symbol"));
2439        table.add_column(DataColumn::new("close"));
2440        table.add_column(DataColumn::new("volume"));
2441
2442        // Add 5 rows of test data
2443        let test_data = vec![
2444            ("AAPL", 99.5, 1000),
2445            ("AAPL", 101.2, 1500),
2446            ("AAPL", 103.5, 2000),
2447            ("AAPL", 105.0, 1200),
2448            ("AAPL", 102.8, 1800),
2449        ];
2450
2451        for (symbol, close, volume) in test_data {
2452            table
2453                .add_row(DataRow::new(vec![
2454                    DataValue::String(symbol.to_string()),
2455                    DataValue::Float(close),
2456                    DataValue::Integer(volume),
2457                ]))
2458                .expect("Should add row successfully");
2459        }
2460
2461        Arc::new(table)
2462    }
2463}
2464
2465#[cfg(test)]
2466#[path = "query_engine_tests.rs"]
2467mod query_engine_tests;