sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
20use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
21use crate::sql::parser::ast::ColumnRef;
22use crate::sql::parser::ast::TableSource;
23use crate::sql::recursive_parser::{
24    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
25    TableFunction,
26};
27
28/// Query engine that executes SQL directly on `DataTable`
29#[derive(Clone)]
30pub struct QueryEngine {
31    case_insensitive: bool,
32    date_notation: String,
33    _behavior_config: Option<BehaviorConfig>,
34}
35
36impl Default for QueryEngine {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl QueryEngine {
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            case_insensitive: false,
47            date_notation: get_date_notation(),
48            _behavior_config: None,
49        }
50    }
51
52    #[must_use]
53    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
54        let case_insensitive = config.case_insensitive_default;
55        // Use get_date_notation() to respect environment variable override
56        let date_notation = get_date_notation();
57        Self {
58            case_insensitive,
59            date_notation,
60            _behavior_config: Some(config),
61        }
62    }
63
64    #[must_use]
65    pub fn with_date_notation(_date_notation: String) -> Self {
66        Self {
67            case_insensitive: false,
68            date_notation: get_date_notation(), // Always use the global function
69            _behavior_config: None,
70        }
71    }
72
73    #[must_use]
74    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
75        Self {
76            case_insensitive,
77            date_notation: get_date_notation(),
78            _behavior_config: None,
79        }
80    }
81
82    #[must_use]
83    pub fn with_case_insensitive_and_date_notation(
84        case_insensitive: bool,
85        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
86    ) -> Self {
87        Self {
88            case_insensitive,
89            date_notation: get_date_notation(), // Always use the global function
90            _behavior_config: None,
91        }
92    }
93
94    /// Find a column name similar to the given name using edit distance
95    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
96        let columns = table.column_names();
97        let mut best_match: Option<(String, usize)> = None;
98
99        for col in columns {
100            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
101            // Only suggest if distance is small (likely a typo)
102            // Allow up to 3 edits for longer names
103            let max_distance = if name.len() > 10 { 3 } else { 2 };
104            if distance <= max_distance {
105                match &best_match {
106                    None => best_match = Some((col, distance)),
107                    Some((_, best_dist)) if distance < *best_dist => {
108                        best_match = Some((col, distance));
109                    }
110                    _ => {}
111                }
112            }
113        }
114
115        best_match.map(|(name, _)| name)
116    }
117
118    /// Calculate Levenshtein edit distance between two strings
119    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
120        let len1 = s1.len();
121        let len2 = s2.len();
122        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
123
124        for i in 0..=len1 {
125            matrix[i][0] = i;
126        }
127        for j in 0..=len2 {
128            matrix[0][j] = j;
129        }
130
131        for (i, c1) in s1.chars().enumerate() {
132            for (j, c2) in s2.chars().enumerate() {
133                let cost = usize::from(c1 != c2);
134                matrix[i + 1][j + 1] = std::cmp::min(
135                    matrix[i][j + 1] + 1, // deletion
136                    std::cmp::min(
137                        matrix[i + 1][j] + 1, // insertion
138                        matrix[i][j] + cost,  // substitution
139                    ),
140                );
141            }
142        }
143
144        matrix[len1][len2]
145    }
146
147    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
148    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
149        let (view, _plan) = self.execute_with_plan(table, sql)?;
150        Ok(view)
151    }
152
153    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
154    pub fn execute_statement(
155        &self,
156        table: Arc<DataTable>,
157        statement: SelectStatement,
158    ) -> Result<DataView> {
159        // First process CTEs to build context
160        let mut cte_context = HashMap::new();
161        for cte in &statement.ctes {
162            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
163            // Execute the CTE based on its type
164            let cte_result = match &cte.cte_type {
165                CTEType::Standard(query) => {
166                    // Execute the CTE query (it might reference earlier CTEs)
167                    let view = self.build_view_with_context(
168                        table.clone(),
169                        query.clone(),
170                        &mut cte_context,
171                    )?;
172
173                    // Materialize the view and enrich columns with qualified names
174                    let mut materialized = self.materialize_view(view)?;
175
176                    // Enrich columns with qualified names for proper scoping
177                    for column in materialized.columns_mut() {
178                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
179                        column.source_table = Some(cte.name.clone());
180                    }
181
182                    DataView::new(Arc::new(materialized))
183                }
184                CTEType::Web(web_spec) => {
185                    // Fetch data from URL
186                    use crate::web::http_fetcher::WebDataFetcher;
187
188                    let fetcher = WebDataFetcher::new()?;
189                    // Pass None for query context (no full SQL available in these contexts)
190                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
191
192                    // Enrich columns with qualified names for proper scoping
193                    for column in data_table.columns_mut() {
194                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195                        column.source_table = Some(cte.name.clone());
196                    }
197
198                    // Convert DataTable to DataView
199                    DataView::new(Arc::new(data_table))
200                }
201            };
202            // Store the result in the context for later use
203            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204            debug!(
205                "QueryEngine: CTE '{}' pre-processed, stored in context",
206                cte.name
207            );
208        }
209
210        // Now process subqueries with CTE context available
211        let mut subquery_executor =
212            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215        // Build the view with the same CTE context
216        self.build_view_with_context(table, processed_statement, &mut cte_context)
217    }
218
219    /// Execute a statement with provided CTE context (for subqueries)
220    pub fn execute_statement_with_cte_context(
221        &self,
222        table: Arc<DataTable>,
223        statement: SelectStatement,
224        cte_context: &HashMap<String, Arc<DataView>>,
225    ) -> Result<DataView> {
226        // Clone the context so we can add any CTEs from this statement
227        let mut local_context = cte_context.clone();
228
229        // Process any CTEs in this statement (they might be nested)
230        for cte in &statement.ctes {
231            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232            let cte_result = match &cte.cte_type {
233                CTEType::Standard(query) => {
234                    let view = self.build_view_with_context(
235                        table.clone(),
236                        query.clone(),
237                        &mut local_context,
238                    )?;
239
240                    // Materialize the view and enrich columns with qualified names
241                    let mut materialized = self.materialize_view(view)?;
242
243                    // Enrich columns with qualified names for proper scoping
244                    for column in materialized.columns_mut() {
245                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246                        column.source_table = Some(cte.name.clone());
247                    }
248
249                    DataView::new(Arc::new(materialized))
250                }
251                CTEType::Web(web_spec) => {
252                    // Fetch data from URL
253                    use crate::web::http_fetcher::WebDataFetcher;
254
255                    let fetcher = WebDataFetcher::new()?;
256                    // Pass None for query context (no full SQL available in these contexts)
257                    let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
258
259                    // Enrich columns with qualified names for proper scoping
260                    for column in data_table.columns_mut() {
261                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
262                        column.source_table = Some(cte.name.clone());
263                    }
264
265                    // Convert DataTable to DataView
266                    DataView::new(Arc::new(data_table))
267                }
268            };
269            local_context.insert(cte.name.clone(), Arc::new(cte_result));
270        }
271
272        // Process subqueries with the complete context
273        let mut subquery_executor =
274            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
275        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
276
277        // Build the view
278        self.build_view_with_context(table, processed_statement, &mut local_context)
279    }
280
281    /// Execute a query and return both the result and the execution plan
282    pub fn execute_with_plan(
283        &self,
284        table: Arc<DataTable>,
285        sql: &str,
286    ) -> Result<(DataView, ExecutionPlan)> {
287        let mut plan_builder = ExecutionPlanBuilder::new();
288        let start_time = Instant::now();
289
290        // Parse the SQL query
291        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
292        plan_builder.add_detail(format!("Query: {}", sql));
293        let mut parser = Parser::new(sql);
294        let statement = parser
295            .parse()
296            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
297        plan_builder.add_detail(format!("Parsed successfully"));
298        if let Some(from) = &statement.from_table {
299            plan_builder.add_detail(format!("FROM: {}", from));
300        }
301        if statement.where_clause.is_some() {
302            plan_builder.add_detail("WHERE clause present".to_string());
303        }
304        plan_builder.end_step();
305
306        // First process CTEs to build context
307        let mut cte_context = HashMap::new();
308
309        if !statement.ctes.is_empty() {
310            plan_builder.begin_step(
311                StepType::CTE,
312                format!("Process {} CTEs", statement.ctes.len()),
313            );
314
315            for cte in &statement.ctes {
316                let cte_start = Instant::now();
317                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
318
319                let cte_result = match &cte.cte_type {
320                    CTEType::Standard(query) => {
321                        // Add CTE query details
322                        if let Some(from) = &query.from_table {
323                            plan_builder.add_detail(format!("Source: {}", from));
324                        }
325                        if query.where_clause.is_some() {
326                            plan_builder.add_detail("Has WHERE clause".to_string());
327                        }
328                        if query.group_by.is_some() {
329                            plan_builder.add_detail("Has GROUP BY".to_string());
330                        }
331
332                        debug!(
333                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
334                            cte.name,
335                            cte_context.keys().collect::<Vec<_>>()
336                        );
337
338                        // Process subqueries in the CTE's query FIRST
339                        // This allows the subqueries to see all previously defined CTEs
340                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
341                            self.clone(),
342                            table.clone(),
343                            cte_context.clone(),
344                        );
345                        let processed_query = subquery_executor.execute_subqueries(query)?;
346
347                        let view = self.build_view_with_context(
348                            table.clone(),
349                            processed_query,
350                            &mut cte_context,
351                        )?;
352
353                        // Materialize the view and enrich columns with qualified names
354                        let mut materialized = self.materialize_view(view)?;
355
356                        // Enrich columns with qualified names for proper scoping
357                        for column in materialized.columns_mut() {
358                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
359                            column.source_table = Some(cte.name.clone());
360                        }
361
362                        DataView::new(Arc::new(materialized))
363                    }
364                    CTEType::Web(web_spec) => {
365                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
366                        if let Some(format) = &web_spec.format {
367                            plan_builder.add_detail(format!("Format: {:?}", format));
368                        }
369                        if let Some(cache) = web_spec.cache_seconds {
370                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
371                        }
372
373                        // Fetch data from URL
374                        use crate::web::http_fetcher::WebDataFetcher;
375
376                        let fetcher = WebDataFetcher::new()?;
377                        // Pass None for query context - each WEB CTE is independent
378                        let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
379
380                        // Enrich columns with qualified names for proper scoping
381                        for column in data_table.columns_mut() {
382                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
383                            column.source_table = Some(cte.name.clone());
384                        }
385
386                        // Convert DataTable to DataView
387                        DataView::new(Arc::new(data_table))
388                    }
389                };
390
391                // Record CTE statistics
392                plan_builder.set_rows_out(cte_result.row_count());
393                plan_builder.add_detail(format!(
394                    "Result: {} rows, {} columns",
395                    cte_result.row_count(),
396                    cte_result.column_count()
397                ));
398                plan_builder.add_detail(format!(
399                    "Execution time: {:.3}ms",
400                    cte_start.elapsed().as_secs_f64() * 1000.0
401                ));
402
403                debug!(
404                    "QueryEngine: Storing CTE '{}' in context with {} rows",
405                    cte.name,
406                    cte_result.row_count()
407                );
408                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
409                plan_builder.end_step();
410            }
411
412            plan_builder.add_detail(format!(
413                "All {} CTEs cached in context",
414                statement.ctes.len()
415            ));
416            plan_builder.end_step();
417        }
418
419        // Process subqueries in the statement with CTE context
420        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
421        let mut subquery_executor =
422            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
423
424        // Check if there are subqueries to process
425        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
426            // This is a simplified check - in reality we'd need to walk the AST
427            format!("{:?}", w).contains("Subquery")
428        });
429
430        if has_subqueries {
431            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
432        }
433
434        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
435
436        if has_subqueries {
437            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
438        } else {
439            plan_builder.add_detail("No subqueries to process".to_string());
440        }
441
442        plan_builder.end_step();
443        let result = self.build_view_with_context_and_plan(
444            table,
445            processed_statement,
446            &mut cte_context,
447            &mut plan_builder,
448        )?;
449
450        let total_duration = start_time.elapsed();
451        info!(
452            "Query execution complete: total={:?}, rows={}",
453            total_duration,
454            result.row_count()
455        );
456
457        let plan = plan_builder.build();
458        Ok((result, plan))
459    }
460
461    /// Build a `DataView` from a parsed SQL statement
462    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
463        let mut cte_context = HashMap::new();
464        self.build_view_with_context(table, statement, &mut cte_context)
465    }
466
467    /// Build a DataView from a SelectStatement with CTE context
468    fn build_view_with_context(
469        &self,
470        table: Arc<DataTable>,
471        statement: SelectStatement,
472        cte_context: &mut HashMap<String, Arc<DataView>>,
473    ) -> Result<DataView> {
474        let mut dummy_plan = ExecutionPlanBuilder::new();
475        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
476    }
477
478    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
479    fn build_view_with_context_and_plan(
480        &self,
481        table: Arc<DataTable>,
482        statement: SelectStatement,
483        cte_context: &mut HashMap<String, Arc<DataView>>,
484        plan: &mut ExecutionPlanBuilder,
485    ) -> Result<DataView> {
486        // First, process any CTEs that aren't already in the context
487        for cte in &statement.ctes {
488            // Skip if already processed (e.g., by execute_select for WEB CTEs)
489            if cte_context.contains_key(&cte.name) {
490                debug!(
491                    "QueryEngine: CTE '{}' already in context, skipping",
492                    cte.name
493                );
494                continue;
495            }
496
497            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
498            debug!(
499                "QueryEngine: Available CTEs for '{}': {:?}",
500                cte.name,
501                cte_context.keys().collect::<Vec<_>>()
502            );
503
504            // Execute the CTE query (it might reference earlier CTEs)
505            let cte_result = match &cte.cte_type {
506                CTEType::Standard(query) => {
507                    let view =
508                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
509
510                    // Materialize the view and enrich columns with qualified names
511                    let mut materialized = self.materialize_view(view)?;
512
513                    // Enrich columns with qualified names for proper scoping
514                    for column in materialized.columns_mut() {
515                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
516                        column.source_table = Some(cte.name.clone());
517                    }
518
519                    DataView::new(Arc::new(materialized))
520                }
521                CTEType::Web(_web_spec) => {
522                    // Web CTEs should have been processed earlier in execute_select
523                    return Err(anyhow!(
524                        "Web CTEs should be processed in execute_select method"
525                    ));
526                }
527            };
528
529            // Store the result in the context for later use
530            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
531            debug!(
532                "QueryEngine: CTE '{}' processed, stored in context",
533                cte.name
534            );
535        }
536
537        // Determine the source table for the main query
538        let source_table = if let Some(ref table_func) = statement.from_function {
539            // Handle table functions like RANGE()
540            debug!("QueryEngine: Processing table function...");
541            match table_func {
542                TableFunction::Generator { name, args } => {
543                    // Use the generator registry to create the table
544                    use crate::sql::generators::GeneratorRegistry;
545
546                    // Create generator registry (could be cached in QueryEngine)
547                    let registry = GeneratorRegistry::new();
548
549                    if let Some(generator) = registry.get(name) {
550                        // Evaluate arguments
551                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
552                            &table,
553                            self.date_notation.clone(),
554                        );
555                        let dummy_row = 0;
556
557                        let mut evaluated_args = Vec::new();
558                        for arg in args {
559                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
560                        }
561
562                        // Generate the table
563                        generator.generate(evaluated_args)?
564                    } else {
565                        return Err(anyhow!("Unknown generator function: {}", name));
566                    }
567                }
568            }
569        } else if let Some(ref subquery) = statement.from_subquery {
570            // Execute the subquery and use its result as the source
571            debug!("QueryEngine: Processing FROM subquery...");
572            let subquery_result =
573                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
574
575            // Convert the DataView to a DataTable for use as source
576            // This materializes the subquery result
577            let materialized = self.materialize_view(subquery_result)?;
578            Arc::new(materialized)
579        } else if let Some(ref table_name) = statement.from_table {
580            // Check if this references a CTE
581            if let Some(cte_view) = cte_context.get(table_name) {
582                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
583                // Materialize the CTE view as a table
584                let mut materialized = self.materialize_view((**cte_view).clone())?;
585
586                // Apply alias to qualified column names if present
587                if let Some(ref alias) = statement.from_alias {
588                    debug!(
589                        "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
590                        alias, table_name
591                    );
592                    for column in materialized.columns_mut() {
593                        // Replace the CTE name with the alias in qualified names
594                        if let Some(ref qualified_name) = column.qualified_name {
595                            if qualified_name.starts_with(&format!("{}.", table_name)) {
596                                column.qualified_name =
597                                    Some(qualified_name.replace(
598                                        &format!("{}.", table_name),
599                                        &format!("{}.", alias),
600                                    ));
601                            }
602                        }
603                        // Update source table to reflect the alias
604                        if column.source_table.as_ref() == Some(table_name) {
605                            column.source_table = Some(alias.clone());
606                        }
607                    }
608                }
609
610                Arc::new(materialized)
611            } else {
612                // Regular table reference - use the provided table
613                table.clone()
614            }
615        } else {
616            // No FROM clause - use the provided table
617            table.clone()
618        };
619
620        // Process JOINs if present
621        let final_table = if !statement.joins.is_empty() {
622            plan.begin_step(
623                StepType::Join,
624                format!("Process {} JOINs", statement.joins.len()),
625            );
626            plan.set_rows_in(source_table.row_count());
627
628            let join_executor = HashJoinExecutor::new(self.case_insensitive);
629            let mut current_table = source_table;
630
631            for (idx, join_clause) in statement.joins.iter().enumerate() {
632                let join_start = Instant::now();
633                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
634                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
635                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
636                plan.add_detail(format!(
637                    "Executing {:?} JOIN on {} condition(s)",
638                    join_clause.join_type,
639                    join_clause.condition.conditions.len()
640                ));
641
642                // Resolve the right table for the join
643                let right_table = match &join_clause.table {
644                    TableSource::Table(name) => {
645                        // Check if it's a CTE reference
646                        if let Some(cte_view) = cte_context.get(name) {
647                            let mut materialized = self.materialize_view((**cte_view).clone())?;
648
649                            // Apply alias to qualified column names if present
650                            if let Some(ref alias) = join_clause.alias {
651                                debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
652                                for column in materialized.columns_mut() {
653                                    // Replace the CTE name with the alias in qualified names
654                                    if let Some(ref qualified_name) = column.qualified_name {
655                                        if qualified_name.starts_with(&format!("{}.", name)) {
656                                            column.qualified_name = Some(qualified_name.replace(
657                                                &format!("{}.", name),
658                                                &format!("{}.", alias),
659                                            ));
660                                        }
661                                    }
662                                    // Update source table to reflect the alias
663                                    if column.source_table.as_ref() == Some(name) {
664                                        column.source_table = Some(alias.clone());
665                                    }
666                                }
667                            }
668
669                            Arc::new(materialized)
670                        } else {
671                            // For now, we need the actual table data
672                            // In a real implementation, this would load from file
673                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
674                        }
675                    }
676                    TableSource::DerivedTable { query, alias: _ } => {
677                        // Execute the subquery
678                        let subquery_result = self.build_view_with_context(
679                            table.clone(),
680                            *query.clone(),
681                            cte_context,
682                        )?;
683                        let materialized = self.materialize_view(subquery_result)?;
684                        Arc::new(materialized)
685                    }
686                };
687
688                // Execute the join
689                let joined = join_executor.execute_join(
690                    current_table.clone(),
691                    join_clause,
692                    right_table.clone(),
693                )?;
694
695                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
696                plan.set_rows_out(joined.row_count());
697                plan.add_detail(format!("Result: {} rows", joined.row_count()));
698                plan.add_detail(format!(
699                    "Join time: {:.3}ms",
700                    join_start.elapsed().as_secs_f64() * 1000.0
701                ));
702                plan.end_step();
703
704                current_table = Arc::new(joined);
705            }
706
707            plan.set_rows_out(current_table.row_count());
708            plan.add_detail(format!(
709                "Final result after all joins: {} rows",
710                current_table.row_count()
711            ));
712            plan.end_step();
713            current_table
714        } else {
715            source_table
716        };
717
718        // Continue with the existing build_view logic but using final_table
719        self.build_view_internal_with_plan(final_table, statement, plan)
720    }
721
722    /// Materialize a DataView into a new DataTable
723    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
724        let source = view.source();
725        let mut result_table = DataTable::new("derived");
726
727        // Get the visible columns from the view
728        let visible_cols = view.visible_column_indices().to_vec();
729
730        // Copy column definitions
731        for col_idx in &visible_cols {
732            let col = &source.columns[*col_idx];
733            let new_col = DataColumn {
734                name: col.name.clone(),
735                data_type: col.data_type.clone(),
736                nullable: col.nullable,
737                unique_values: col.unique_values,
738                null_count: col.null_count,
739                metadata: col.metadata.clone(),
740                qualified_name: col.qualified_name.clone(), // Preserve qualified name
741                source_table: col.source_table.clone(),     // Preserve source table
742            };
743            result_table.add_column(new_col);
744        }
745
746        // Copy visible rows
747        for row_idx in view.visible_row_indices() {
748            let source_row = &source.rows[*row_idx];
749            let mut new_row = DataRow { values: Vec::new() };
750
751            for col_idx in &visible_cols {
752                new_row.values.push(source_row.values[*col_idx].clone());
753            }
754
755            result_table.add_row(new_row);
756        }
757
758        Ok(result_table)
759    }
760
761    fn build_view_internal(
762        &self,
763        table: Arc<DataTable>,
764        statement: SelectStatement,
765    ) -> Result<DataView> {
766        let mut dummy_plan = ExecutionPlanBuilder::new();
767        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
768    }
769
770    fn build_view_internal_with_plan(
771        &self,
772        table: Arc<DataTable>,
773        statement: SelectStatement,
774        plan: &mut ExecutionPlanBuilder,
775    ) -> Result<DataView> {
776        debug!(
777            "QueryEngine::build_view - select_items: {:?}",
778            statement.select_items
779        );
780        debug!(
781            "QueryEngine::build_view - where_clause: {:?}",
782            statement.where_clause
783        );
784
785        // Start with all rows visible
786        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
787
788        // Apply WHERE clause filtering using recursive evaluator
789        if let Some(where_clause) = &statement.where_clause {
790            let total_rows = table.row_count();
791            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
792            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
793
794            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
795            plan.set_rows_in(total_rows);
796            plan.add_detail(format!("Input: {} rows", total_rows));
797
798            // Add details about WHERE conditions
799            for condition in &where_clause.conditions {
800                plan.add_detail(format!("Condition: {:?}", condition.expr));
801            }
802
803            let filter_start = Instant::now();
804            // Create an evaluation context for caching compiled regexes
805            let mut eval_context = EvaluationContext::new(self.case_insensitive);
806
807            // Filter visible rows based on WHERE clause
808            let mut filtered_rows = Vec::new();
809            for row_idx in visible_rows {
810                // Only log for first few rows to avoid performance impact
811                if row_idx < 3 {
812                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
813                }
814                let mut evaluator =
815                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
816                match evaluator.evaluate(where_clause, row_idx) {
817                    Ok(result) => {
818                        if row_idx < 3 {
819                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
820                        }
821                        if result {
822                            filtered_rows.push(row_idx);
823                        }
824                    }
825                    Err(e) => {
826                        if row_idx < 3 {
827                            debug!(
828                                "QueryEngine: WHERE evaluation error for row {}: {}",
829                                row_idx, e
830                            );
831                        }
832                        // Propagate WHERE clause errors instead of silently ignoring them
833                        return Err(e);
834                    }
835                }
836            }
837
838            // Log regex cache statistics
839            let (compilations, cache_hits) = eval_context.get_stats();
840            if compilations > 0 || cache_hits > 0 {
841                debug!(
842                    "LIKE pattern cache: {} compilations, {} cache hits",
843                    compilations, cache_hits
844                );
845            }
846            visible_rows = filtered_rows;
847            let filter_duration = filter_start.elapsed();
848            info!(
849                "WHERE clause filtering: {} rows -> {} rows in {:?}",
850                total_rows,
851                visible_rows.len(),
852                filter_duration
853            );
854
855            plan.set_rows_out(visible_rows.len());
856            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
857            plan.add_detail(format!(
858                "Filter time: {:.3}ms",
859                filter_duration.as_secs_f64() * 1000.0
860            ));
861            plan.end_step();
862        }
863
864        // Create initial DataView with filtered rows
865        let mut view = DataView::new(table.clone());
866        view = view.with_rows(visible_rows);
867
868        // Handle GROUP BY if present
869        if let Some(group_by_exprs) = &statement.group_by {
870            if !group_by_exprs.is_empty() {
871                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
872
873                plan.begin_step(
874                    StepType::GroupBy,
875                    format!("GROUP BY {} expressions", group_by_exprs.len()),
876                );
877                plan.set_rows_in(view.row_count());
878                plan.add_detail(format!("Input: {} rows", view.row_count()));
879                for expr in group_by_exprs {
880                    plan.add_detail(format!("Group by: {:?}", expr));
881                }
882
883                let group_start = Instant::now();
884                view = self.apply_group_by(
885                    view,
886                    group_by_exprs,
887                    &statement.select_items,
888                    statement.having.as_ref(),
889                    plan,
890                )?;
891
892                plan.set_rows_out(view.row_count());
893                plan.add_detail(format!("Output: {} groups", view.row_count()));
894                plan.add_detail(format!(
895                    "Overall time: {:.3}ms",
896                    group_start.elapsed().as_secs_f64() * 1000.0
897                ));
898                plan.end_step();
899            }
900        } else {
901            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
902            if !statement.select_items.is_empty() {
903                // Check if we have ANY non-star items (not just the first one)
904                let has_non_star_items = statement
905                    .select_items
906                    .iter()
907                    .any(|item| !matches!(item, SelectItem::Star));
908
909                // Apply select items if:
910                // 1. We have computed expressions or explicit columns
911                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
912                if has_non_star_items || statement.select_items.len() > 1 {
913                    view = self.apply_select_items(view, &statement.select_items)?;
914                } else {
915                }
916                // If it's just a single star, no projection needed
917            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
918                debug!("QueryEngine: Using legacy columns path");
919                // Fallback to legacy column projection for backward compatibility
920                // Use the current view's source table, not the original table
921                let source_table = view.source();
922                let column_indices =
923                    self.resolve_column_indices(source_table, &statement.columns)?;
924                view = view.with_columns(column_indices);
925            }
926        }
927
928        // Apply DISTINCT if specified
929        if statement.distinct {
930            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
931            plan.set_rows_in(view.row_count());
932            plan.add_detail(format!("Input: {} rows", view.row_count()));
933
934            let distinct_start = Instant::now();
935            view = self.apply_distinct(view)?;
936
937            plan.set_rows_out(view.row_count());
938            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
939            plan.add_detail(format!(
940                "Distinct time: {:.3}ms",
941                distinct_start.elapsed().as_secs_f64() * 1000.0
942            ));
943            plan.end_step();
944        }
945
946        // Apply ORDER BY sorting
947        if let Some(order_by_columns) = &statement.order_by {
948            if !order_by_columns.is_empty() {
949                plan.begin_step(
950                    StepType::Sort,
951                    format!("ORDER BY {} columns", order_by_columns.len()),
952                );
953                plan.set_rows_in(view.row_count());
954                for col in order_by_columns {
955                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
956                }
957
958                let sort_start = Instant::now();
959                view = self.apply_multi_order_by(view, order_by_columns)?;
960
961                plan.add_detail(format!(
962                    "Sort time: {:.3}ms",
963                    sort_start.elapsed().as_secs_f64() * 1000.0
964                ));
965                plan.end_step();
966            }
967        }
968
969        // Apply LIMIT/OFFSET
970        if let Some(limit) = statement.limit {
971            let offset = statement.offset.unwrap_or(0);
972            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
973            plan.set_rows_in(view.row_count());
974            if offset > 0 {
975                plan.add_detail(format!("OFFSET: {}", offset));
976            }
977            view = view.with_limit(limit, offset);
978            plan.set_rows_out(view.row_count());
979            plan.add_detail(format!("Output: {} rows", view.row_count()));
980            plan.end_step();
981        }
982
983        Ok(view)
984    }
985
986    /// Resolve column names to indices
987    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
988        let mut indices = Vec::new();
989        let table_columns = table.column_names();
990
991        for col_name in columns {
992            let index = table_columns
993                .iter()
994                .position(|c| c.eq_ignore_ascii_case(col_name))
995                .ok_or_else(|| {
996                    let suggestion = self.find_similar_column(table, col_name);
997                    match suggestion {
998                        Some(similar) => anyhow::anyhow!(
999                            "Column '{}' not found. Did you mean '{}'?",
1000                            col_name,
1001                            similar
1002                        ),
1003                        None => anyhow::anyhow!("Column '{}' not found", col_name),
1004                    }
1005                })?;
1006            indices.push(index);
1007        }
1008
1009        Ok(indices)
1010    }
1011
1012    /// Apply SELECT items (columns and computed expressions) to create new view
1013    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1014        debug!(
1015            "QueryEngine::apply_select_items - items: {:?}",
1016            select_items
1017        );
1018        debug!(
1019            "QueryEngine::apply_select_items - input view has {} rows",
1020            view.row_count()
1021        );
1022
1023        // Check if this is an aggregate query:
1024        // 1. At least one aggregate function exists
1025        // 2. All other items are either aggregates or constants (aggregate-compatible)
1026        let has_aggregates = select_items.iter().any(|item| match item {
1027            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1028            SelectItem::Column(_) => false,
1029            SelectItem::Star => false,
1030        });
1031
1032        let all_aggregate_compatible = select_items.iter().all(|item| match item {
1033            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1034            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
1035            SelectItem::Star => false,      // Star is not aggregate-compatible
1036        });
1037
1038        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1039            // Special handling for aggregate queries with constants (no GROUP BY)
1040            // These should produce exactly one row
1041            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1042            return self.apply_aggregate_select(view, select_items);
1043        }
1044
1045        // Check if we need to create computed columns
1046        let has_computed_expressions = select_items
1047            .iter()
1048            .any(|item| matches!(item, SelectItem::Expression { .. }));
1049
1050        debug!(
1051            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1052            has_computed_expressions
1053        );
1054
1055        if !has_computed_expressions {
1056            // Simple case: only columns, use existing projection logic
1057            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1058            return Ok(view.with_columns(column_indices));
1059        }
1060
1061        // Complex case: we have computed expressions
1062        // IMPORTANT: We create a PROJECTED view, not a new table
1063        // This preserves the original DataTable reference
1064
1065        let source_table = view.source();
1066        let visible_rows = view.visible_row_indices();
1067
1068        // Create a temporary table just for the computed result view
1069        // But this table is only used for the current query result
1070        let mut computed_table = DataTable::new("query_result");
1071
1072        // First, expand any Star selectors to actual columns
1073        let mut expanded_items = Vec::new();
1074        for item in select_items {
1075            match item {
1076                SelectItem::Star => {
1077                    // Expand * to all columns from source table
1078                    for col_name in source_table.column_names() {
1079                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1080                            col_name.to_string(),
1081                        )));
1082                    }
1083                }
1084                _ => expanded_items.push(item.clone()),
1085            }
1086        }
1087
1088        // Add columns based on expanded SelectItems, handling duplicates
1089        let mut column_name_counts: std::collections::HashMap<String, usize> =
1090            std::collections::HashMap::new();
1091
1092        for item in &expanded_items {
1093            let base_name = match item {
1094                SelectItem::Column(col_ref) => col_ref.name.clone(),
1095                SelectItem::Expression { alias, .. } => alias.clone(),
1096                SelectItem::Star => unreachable!("Star should have been expanded"),
1097            };
1098
1099            // Check if this column name has been used before
1100            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1101            let column_name = if *count == 0 {
1102                // First occurrence, use the name as-is
1103                base_name.clone()
1104            } else {
1105                // Duplicate, append a suffix
1106                format!("{base_name}_{count}")
1107            };
1108            *count += 1;
1109
1110            computed_table.add_column(DataColumn::new(&column_name));
1111        }
1112
1113        // Calculate values for each row
1114        let mut evaluator =
1115            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1116
1117        for &row_idx in visible_rows {
1118            let mut row_values = Vec::new();
1119
1120            for item in &expanded_items {
1121                let value = match item {
1122                    SelectItem::Column(col_ref) => {
1123                        // Column reference with optional table prefix
1124                        let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1125                            // For qualified references, ONLY try qualified lookup - no fallback
1126                            let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1127                            let result = source_table.find_column_by_qualified_name(&qualified_name);
1128
1129                            // Debug: log what qualified names are available when lookup fails
1130                            if result.is_none() {
1131                                let available_qualified: Vec<String> = source_table.columns.iter()
1132                                    .filter_map(|c| c.qualified_name.clone())
1133                                    .collect();
1134                                let available_simple: Vec<String> = source_table.columns.iter()
1135                                    .map(|c| c.name.clone())
1136                                    .collect();
1137                                debug!(
1138                                    "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1139                                    qualified_name, available_qualified, available_simple
1140                                );
1141                                // This MUST return None to trigger error
1142                            }
1143                            result
1144                        } else {
1145                            // Simple column name lookup
1146                            source_table.get_column_index(&col_ref.name)
1147                        }
1148                        .ok_or_else(|| {
1149                            let display_name = if let Some(prefix) = &col_ref.table_prefix {
1150                                format!("{}.{}", prefix, col_ref.name)
1151                            } else {
1152                                col_ref.name.clone()
1153                            };
1154                            let suggestion = self.find_similar_column(source_table, &col_ref.name);
1155                            match suggestion {
1156                                Some(similar) => anyhow::anyhow!(
1157                                    "Column '{}' not found. Did you mean '{}'?",
1158                                    display_name,
1159                                    similar
1160                                ),
1161                                None => anyhow::anyhow!("Column '{}' not found", display_name),
1162                            }
1163                        })?;
1164                        let row = source_table
1165                            .get_row(row_idx)
1166                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1167                        row.get(col_idx)
1168                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1169                            .clone()
1170                    }
1171                    SelectItem::Expression { expr, .. } => {
1172                        // Computed expression
1173                        evaluator.evaluate(expr, row_idx)?
1174                    }
1175                    SelectItem::Star => unreachable!("Star should have been expanded"),
1176                };
1177                row_values.push(value);
1178            }
1179
1180            computed_table
1181                .add_row(DataRow::new(row_values))
1182                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1183        }
1184
1185        // Return a view of the computed result
1186        // This is a temporary view for this query only
1187        Ok(DataView::new(Arc::new(computed_table)))
1188    }
1189
1190    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1191    fn apply_aggregate_select(
1192        &self,
1193        view: DataView,
1194        select_items: &[SelectItem],
1195    ) -> Result<DataView> {
1196        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1197
1198        let source_table = view.source();
1199        let mut result_table = DataTable::new("aggregate_result");
1200
1201        // Add columns for each select item
1202        for item in select_items {
1203            let column_name = match item {
1204                SelectItem::Expression { alias, .. } => alias.clone(),
1205                _ => unreachable!("Should only have expressions in aggregate-only query"),
1206            };
1207            result_table.add_column(DataColumn::new(&column_name));
1208        }
1209
1210        // Create evaluator with visible rows from the view (for filtered aggregates)
1211        let visible_rows = view.visible_row_indices().to_vec();
1212        let mut evaluator =
1213            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1214                .with_visible_rows(visible_rows);
1215
1216        // Evaluate each aggregate expression once (they handle all rows internally)
1217        let mut row_values = Vec::new();
1218        for item in select_items {
1219            match item {
1220                SelectItem::Expression { expr, .. } => {
1221                    // The evaluator will handle aggregates over all rows
1222                    // We pass row_index=0 but aggregates ignore it and process all rows
1223                    let value = evaluator.evaluate(expr, 0)?;
1224                    row_values.push(value);
1225                }
1226                _ => unreachable!("Should only have expressions in aggregate-only query"),
1227            }
1228        }
1229
1230        // Add the single result row
1231        result_table
1232            .add_row(DataRow::new(row_values))
1233            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1234
1235        Ok(DataView::new(Arc::new(result_table)))
1236    }
1237
1238    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1239    fn resolve_select_columns(
1240        &self,
1241        table: &DataTable,
1242        select_items: &[SelectItem],
1243    ) -> Result<Vec<usize>> {
1244        let mut indices = Vec::new();
1245        let table_columns = table.column_names();
1246
1247        for item in select_items {
1248            match item {
1249                SelectItem::Column(col_ref) => {
1250                    // Check if this has a table prefix
1251                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1252                        // For qualified references, ONLY try qualified lookup - no fallback
1253                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1254                        table.find_column_by_qualified_name(&qualified_name)
1255                            .ok_or_else(|| {
1256                                // Check if any columns have qualified names for better error message
1257                                let has_qualified = table.columns.iter()
1258                                    .any(|c| c.qualified_name.is_some());
1259                                if !has_qualified {
1260                                    anyhow::anyhow!(
1261                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1262                                        qualified_name, table_prefix
1263                                    )
1264                                } else {
1265                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1266                                }
1267                            })?
1268                    } else {
1269                        // Simple column name lookup
1270                        table_columns
1271                            .iter()
1272                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1273                            .ok_or_else(|| {
1274                                let suggestion = self.find_similar_column(table, &col_ref.name);
1275                                match suggestion {
1276                                    Some(similar) => anyhow::anyhow!(
1277                                        "Column '{}' not found. Did you mean '{}'?",
1278                                        col_ref.name,
1279                                        similar
1280                                    ),
1281                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1282                                }
1283                            })?
1284                    };
1285                    indices.push(index);
1286                }
1287                SelectItem::Star => {
1288                    // Expand * to all column indices
1289                    for i in 0..table_columns.len() {
1290                        indices.push(i);
1291                    }
1292                }
1293                SelectItem::Expression { .. } => {
1294                    return Err(anyhow::anyhow!(
1295                        "Computed expressions require new table creation"
1296                    ));
1297                }
1298            }
1299        }
1300
1301        Ok(indices)
1302    }
1303
1304    /// Apply DISTINCT to remove duplicate rows
1305    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1306        use std::collections::HashSet;
1307
1308        let source = view.source();
1309        let visible_cols = view.visible_column_indices();
1310        let visible_rows = view.visible_row_indices();
1311
1312        // Build a set to track unique rows
1313        let mut seen_rows = HashSet::new();
1314        let mut unique_row_indices = Vec::new();
1315
1316        for &row_idx in visible_rows {
1317            // Build a key representing this row's visible column values
1318            let mut row_key = Vec::new();
1319            for &col_idx in visible_cols {
1320                let value = source
1321                    .get_value(row_idx, col_idx)
1322                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1323                // Convert value to a hashable representation
1324                row_key.push(format!("{:?}", value));
1325            }
1326
1327            // Check if we've seen this row before
1328            if seen_rows.insert(row_key) {
1329                // First time seeing this row combination
1330                unique_row_indices.push(row_idx);
1331            }
1332        }
1333
1334        // Create a new view with only unique rows
1335        Ok(view.with_rows(unique_row_indices))
1336    }
1337
1338    /// Apply multi-column ORDER BY sorting to the view
1339    fn apply_multi_order_by(
1340        &self,
1341        mut view: DataView,
1342        order_by_columns: &[OrderByColumn],
1343    ) -> Result<DataView> {
1344        // Build list of (source_column_index, ascending) tuples
1345        let mut sort_columns = Vec::new();
1346
1347        for order_col in order_by_columns {
1348            // Try to find the column index
1349            // Check in the current view's source table (this handles both regular columns and computed columns)
1350            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1351            let col_index = view
1352                .source()
1353                .get_column_index(&order_col.column)
1354                .ok_or_else(|| {
1355                    // If not found, provide helpful error with suggestions
1356                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1357                    match suggestion {
1358                        Some(similar) => anyhow::anyhow!(
1359                            "Column '{}' not found. Did you mean '{}'?",
1360                            order_col.column,
1361                            similar
1362                        ),
1363                        None => {
1364                            // Also list available columns for debugging
1365                            let available_cols = view.source().column_names().join(", ");
1366                            anyhow::anyhow!(
1367                                "Column '{}' not found. Available columns: {}",
1368                                order_col.column,
1369                                available_cols
1370                            )
1371                        }
1372                    }
1373                })?;
1374
1375            let ascending = matches!(order_col.direction, SortDirection::Asc);
1376            sort_columns.push((col_index, ascending));
1377        }
1378
1379        // Apply multi-column sorting
1380        view.apply_multi_sort(&sort_columns)?;
1381        Ok(view)
1382    }
1383
1384    /// Apply GROUP BY to the view with optional HAVING clause
1385    fn apply_group_by(
1386        &self,
1387        view: DataView,
1388        group_by_exprs: &[SqlExpression],
1389        select_items: &[SelectItem],
1390        having: Option<&SqlExpression>,
1391        plan: &mut ExecutionPlanBuilder,
1392    ) -> Result<DataView> {
1393        // Use the new expression-based GROUP BY implementation
1394        let (result_view, phase_info) = self.apply_group_by_expressions(
1395            view,
1396            group_by_exprs,
1397            select_items,
1398            having,
1399            self.case_insensitive,
1400            self.date_notation.clone(),
1401        )?;
1402
1403        // Add detailed phase information to the execution plan
1404        plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1405        plan.add_detail(format!(
1406            "Phase 1 - Group Building: {:.3}ms",
1407            phase_info.phase2_key_building.as_secs_f64() * 1000.0
1408        ));
1409        plan.add_detail(format!(
1410            "  • Processing {} rows into {} groups",
1411            phase_info.total_rows, phase_info.num_groups
1412        ));
1413        plan.add_detail(format!(
1414            "Phase 2 - Aggregation: {:.3}ms",
1415            phase_info.phase4_aggregation.as_secs_f64() * 1000.0
1416        ));
1417        if phase_info.phase4_having_evaluation > Duration::ZERO {
1418            plan.add_detail(format!(
1419                "Phase 3 - HAVING Filter: {:.3}ms",
1420                phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
1421            ));
1422            plan.add_detail(format!(
1423                "  • Filtered {} groups",
1424                phase_info.groups_filtered_by_having
1425            ));
1426        }
1427        plan.add_detail(format!(
1428            "Total GROUP BY time: {:.3}ms",
1429            phase_info.total_time.as_secs_f64() * 1000.0
1430        ));
1431
1432        Ok(result_view)
1433    }
1434
1435    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1436    /// This helps pre-size hash tables for better performance
1437    pub fn estimate_group_cardinality(
1438        &self,
1439        view: &DataView,
1440        group_by_exprs: &[SqlExpression],
1441    ) -> usize {
1442        // If we have few rows, just return the row count as upper bound
1443        let row_count = view.get_visible_rows().len();
1444        if row_count <= 100 {
1445            return row_count;
1446        }
1447
1448        // Sample first 1000 rows or 10% of data, whichever is smaller
1449        let sample_size = min(1000, row_count / 10).max(100);
1450        let mut seen = FxHashSet::default();
1451
1452        let visible_rows = view.get_visible_rows();
1453        for (i, &row_idx) in visible_rows.iter().enumerate() {
1454            if i >= sample_size {
1455                break;
1456            }
1457
1458            // Evaluate GROUP BY expressions for this row
1459            let mut key_values = Vec::new();
1460            for expr in group_by_exprs {
1461                let mut evaluator = ArithmeticEvaluator::new(view.source());
1462                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1463                key_values.push(value);
1464            }
1465
1466            seen.insert(key_values);
1467        }
1468
1469        // Estimate total cardinality based on sample
1470        let sample_cardinality = seen.len();
1471        let estimated = (sample_cardinality * row_count) / sample_size;
1472
1473        // Cap at row count and ensure minimum of sample cardinality
1474        estimated.min(row_count).max(sample_cardinality)
1475    }
1476}
1477
1478#[cfg(test)]
1479mod tests {
1480    use super::*;
1481    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1482
1483    fn create_test_table() -> Arc<DataTable> {
1484        let mut table = DataTable::new("test");
1485
1486        // Add columns
1487        table.add_column(DataColumn::new("id"));
1488        table.add_column(DataColumn::new("name"));
1489        table.add_column(DataColumn::new("age"));
1490
1491        // Add rows
1492        table
1493            .add_row(DataRow::new(vec![
1494                DataValue::Integer(1),
1495                DataValue::String("Alice".to_string()),
1496                DataValue::Integer(30),
1497            ]))
1498            .unwrap();
1499
1500        table
1501            .add_row(DataRow::new(vec![
1502                DataValue::Integer(2),
1503                DataValue::String("Bob".to_string()),
1504                DataValue::Integer(25),
1505            ]))
1506            .unwrap();
1507
1508        table
1509            .add_row(DataRow::new(vec![
1510                DataValue::Integer(3),
1511                DataValue::String("Charlie".to_string()),
1512                DataValue::Integer(35),
1513            ]))
1514            .unwrap();
1515
1516        Arc::new(table)
1517    }
1518
1519    #[test]
1520    fn test_select_all() {
1521        let table = create_test_table();
1522        let engine = QueryEngine::new();
1523
1524        let view = engine
1525            .execute(table.clone(), "SELECT * FROM users")
1526            .unwrap();
1527        assert_eq!(view.row_count(), 3);
1528        assert_eq!(view.column_count(), 3);
1529    }
1530
1531    #[test]
1532    fn test_select_columns() {
1533        let table = create_test_table();
1534        let engine = QueryEngine::new();
1535
1536        let view = engine
1537            .execute(table.clone(), "SELECT name, age FROM users")
1538            .unwrap();
1539        assert_eq!(view.row_count(), 3);
1540        assert_eq!(view.column_count(), 2);
1541    }
1542
1543    #[test]
1544    fn test_select_with_limit() {
1545        let table = create_test_table();
1546        let engine = QueryEngine::new();
1547
1548        let view = engine
1549            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1550            .unwrap();
1551        assert_eq!(view.row_count(), 2);
1552    }
1553
1554    #[test]
1555    fn test_type_coercion_contains() {
1556        // Initialize tracing for debug output
1557        let _ = tracing_subscriber::fmt()
1558            .with_max_level(tracing::Level::DEBUG)
1559            .try_init();
1560
1561        let mut table = DataTable::new("test");
1562        table.add_column(DataColumn::new("id"));
1563        table.add_column(DataColumn::new("status"));
1564        table.add_column(DataColumn::new("price"));
1565
1566        // Add test data with mixed types
1567        table
1568            .add_row(DataRow::new(vec![
1569                DataValue::Integer(1),
1570                DataValue::String("Pending".to_string()),
1571                DataValue::Float(99.99),
1572            ]))
1573            .unwrap();
1574
1575        table
1576            .add_row(DataRow::new(vec![
1577                DataValue::Integer(2),
1578                DataValue::String("Confirmed".to_string()),
1579                DataValue::Float(150.50),
1580            ]))
1581            .unwrap();
1582
1583        table
1584            .add_row(DataRow::new(vec![
1585                DataValue::Integer(3),
1586                DataValue::String("Pending".to_string()),
1587                DataValue::Float(75.00),
1588            ]))
1589            .unwrap();
1590
1591        let table = Arc::new(table);
1592        let engine = QueryEngine::new();
1593
1594        println!("\n=== Testing WHERE clause with Contains ===");
1595        println!("Table has {} rows", table.row_count());
1596        for i in 0..table.row_count() {
1597            let status = table.get_value(i, 1);
1598            println!("Row {i}: status = {status:?}");
1599        }
1600
1601        // Test 1: Basic string contains (should work)
1602        println!("\n--- Test 1: status.Contains('pend') ---");
1603        let result = engine.execute(
1604            table.clone(),
1605            "SELECT * FROM test WHERE status.Contains('pend')",
1606        );
1607        match result {
1608            Ok(view) => {
1609                println!("SUCCESS: Found {} matching rows", view.row_count());
1610                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1611            }
1612            Err(e) => {
1613                panic!("Query failed: {e}");
1614            }
1615        }
1616
1617        // Test 2: Numeric contains (should work with type coercion)
1618        println!("\n--- Test 2: price.Contains('9') ---");
1619        let result = engine.execute(
1620            table.clone(),
1621            "SELECT * FROM test WHERE price.Contains('9')",
1622        );
1623        match result {
1624            Ok(view) => {
1625                println!(
1626                    "SUCCESS: Found {} matching rows with price containing '9'",
1627                    view.row_count()
1628                );
1629                // Should find 99.99 row
1630                assert!(view.row_count() >= 1);
1631            }
1632            Err(e) => {
1633                panic!("Numeric coercion query failed: {e}");
1634            }
1635        }
1636
1637        println!("\n=== All tests passed! ===");
1638    }
1639
1640    #[test]
1641    fn test_not_in_clause() {
1642        // Initialize tracing for debug output
1643        let _ = tracing_subscriber::fmt()
1644            .with_max_level(tracing::Level::DEBUG)
1645            .try_init();
1646
1647        let mut table = DataTable::new("test");
1648        table.add_column(DataColumn::new("id"));
1649        table.add_column(DataColumn::new("country"));
1650
1651        // Add test data
1652        table
1653            .add_row(DataRow::new(vec![
1654                DataValue::Integer(1),
1655                DataValue::String("CA".to_string()),
1656            ]))
1657            .unwrap();
1658
1659        table
1660            .add_row(DataRow::new(vec![
1661                DataValue::Integer(2),
1662                DataValue::String("US".to_string()),
1663            ]))
1664            .unwrap();
1665
1666        table
1667            .add_row(DataRow::new(vec![
1668                DataValue::Integer(3),
1669                DataValue::String("UK".to_string()),
1670            ]))
1671            .unwrap();
1672
1673        let table = Arc::new(table);
1674        let engine = QueryEngine::new();
1675
1676        println!("\n=== Testing NOT IN clause ===");
1677        println!("Table has {} rows", table.row_count());
1678        for i in 0..table.row_count() {
1679            let country = table.get_value(i, 1);
1680            println!("Row {i}: country = {country:?}");
1681        }
1682
1683        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1684        println!("\n--- Test: country NOT IN ('CA') ---");
1685        let result = engine.execute(
1686            table.clone(),
1687            "SELECT * FROM test WHERE country NOT IN ('CA')",
1688        );
1689        match result {
1690            Ok(view) => {
1691                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1692                assert_eq!(view.row_count(), 2); // Should find US and UK
1693            }
1694            Err(e) => {
1695                panic!("NOT IN query failed: {e}");
1696            }
1697        }
1698
1699        println!("\n=== NOT IN test complete! ===");
1700    }
1701
1702    #[test]
1703    fn test_case_insensitive_in_and_not_in() {
1704        // Initialize tracing for debug output
1705        let _ = tracing_subscriber::fmt()
1706            .with_max_level(tracing::Level::DEBUG)
1707            .try_init();
1708
1709        let mut table = DataTable::new("test");
1710        table.add_column(DataColumn::new("id"));
1711        table.add_column(DataColumn::new("country"));
1712
1713        // Add test data with mixed case
1714        table
1715            .add_row(DataRow::new(vec![
1716                DataValue::Integer(1),
1717                DataValue::String("CA".to_string()), // uppercase
1718            ]))
1719            .unwrap();
1720
1721        table
1722            .add_row(DataRow::new(vec![
1723                DataValue::Integer(2),
1724                DataValue::String("us".to_string()), // lowercase
1725            ]))
1726            .unwrap();
1727
1728        table
1729            .add_row(DataRow::new(vec![
1730                DataValue::Integer(3),
1731                DataValue::String("UK".to_string()), // uppercase
1732            ]))
1733            .unwrap();
1734
1735        let table = Arc::new(table);
1736
1737        println!("\n=== Testing Case-Insensitive IN clause ===");
1738        println!("Table has {} rows", table.row_count());
1739        for i in 0..table.row_count() {
1740            let country = table.get_value(i, 1);
1741            println!("Row {i}: country = {country:?}");
1742        }
1743
1744        // Test case-insensitive IN - should match 'CA' with 'ca'
1745        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1746        let engine = QueryEngine::with_case_insensitive(true);
1747        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1748        match result {
1749            Ok(view) => {
1750                println!(
1751                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1752                    view.row_count()
1753                );
1754                assert_eq!(view.row_count(), 1); // Should find CA row
1755            }
1756            Err(e) => {
1757                panic!("Case-insensitive IN query failed: {e}");
1758            }
1759        }
1760
1761        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
1762        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1763        let result = engine.execute(
1764            table.clone(),
1765            "SELECT * FROM test WHERE country NOT IN ('ca')",
1766        );
1767        match result {
1768            Ok(view) => {
1769                println!(
1770                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1771                    view.row_count()
1772                );
1773                assert_eq!(view.row_count(), 2); // Should find us and UK rows
1774            }
1775            Err(e) => {
1776                panic!("Case-insensitive NOT IN query failed: {e}");
1777            }
1778        }
1779
1780        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
1781        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1782        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
1783        let result = engine_case_sensitive
1784            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1785        match result {
1786            Ok(view) => {
1787                println!(
1788                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1789                    view.row_count()
1790                );
1791                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
1792            }
1793            Err(e) => {
1794                panic!("Case-sensitive IN query failed: {e}");
1795            }
1796        }
1797
1798        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1799    }
1800
1801    #[test]
1802    #[ignore = "Parentheses in WHERE clause not yet implemented"]
1803    fn test_parentheses_in_where_clause() {
1804        // Initialize tracing for debug output
1805        let _ = tracing_subscriber::fmt()
1806            .with_max_level(tracing::Level::DEBUG)
1807            .try_init();
1808
1809        let mut table = DataTable::new("test");
1810        table.add_column(DataColumn::new("id"));
1811        table.add_column(DataColumn::new("status"));
1812        table.add_column(DataColumn::new("priority"));
1813
1814        // Add test data
1815        table
1816            .add_row(DataRow::new(vec![
1817                DataValue::Integer(1),
1818                DataValue::String("Pending".to_string()),
1819                DataValue::String("High".to_string()),
1820            ]))
1821            .unwrap();
1822
1823        table
1824            .add_row(DataRow::new(vec![
1825                DataValue::Integer(2),
1826                DataValue::String("Complete".to_string()),
1827                DataValue::String("High".to_string()),
1828            ]))
1829            .unwrap();
1830
1831        table
1832            .add_row(DataRow::new(vec![
1833                DataValue::Integer(3),
1834                DataValue::String("Pending".to_string()),
1835                DataValue::String("Low".to_string()),
1836            ]))
1837            .unwrap();
1838
1839        table
1840            .add_row(DataRow::new(vec![
1841                DataValue::Integer(4),
1842                DataValue::String("Complete".to_string()),
1843                DataValue::String("Low".to_string()),
1844            ]))
1845            .unwrap();
1846
1847        let table = Arc::new(table);
1848        let engine = QueryEngine::new();
1849
1850        println!("\n=== Testing Parentheses in WHERE clause ===");
1851        println!("Table has {} rows", table.row_count());
1852        for i in 0..table.row_count() {
1853            let status = table.get_value(i, 1);
1854            let priority = table.get_value(i, 2);
1855            println!("Row {i}: status = {status:?}, priority = {priority:?}");
1856        }
1857
1858        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
1859        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1860        let result = engine.execute(
1861            table.clone(),
1862            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1863        );
1864        match result {
1865            Ok(view) => {
1866                println!(
1867                    "SUCCESS: Found {} rows with parenthetical logic",
1868                    view.row_count()
1869                );
1870                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
1871            }
1872            Err(e) => {
1873                panic!("Parentheses query failed: {e}");
1874            }
1875        }
1876
1877        println!("\n=== Parentheses test complete! ===");
1878    }
1879
1880    #[test]
1881    #[ignore = "Numeric type coercion needs fixing"]
1882    fn test_numeric_type_coercion() {
1883        // Initialize tracing for debug output
1884        let _ = tracing_subscriber::fmt()
1885            .with_max_level(tracing::Level::DEBUG)
1886            .try_init();
1887
1888        let mut table = DataTable::new("test");
1889        table.add_column(DataColumn::new("id"));
1890        table.add_column(DataColumn::new("price"));
1891        table.add_column(DataColumn::new("quantity"));
1892
1893        // Add test data with different numeric types
1894        table
1895            .add_row(DataRow::new(vec![
1896                DataValue::Integer(1),
1897                DataValue::Float(99.50), // Contains '.'
1898                DataValue::Integer(100),
1899            ]))
1900            .unwrap();
1901
1902        table
1903            .add_row(DataRow::new(vec![
1904                DataValue::Integer(2),
1905                DataValue::Float(150.0), // Contains '.' and '0'
1906                DataValue::Integer(200),
1907            ]))
1908            .unwrap();
1909
1910        table
1911            .add_row(DataRow::new(vec![
1912                DataValue::Integer(3),
1913                DataValue::Integer(75), // No decimal point
1914                DataValue::Integer(50),
1915            ]))
1916            .unwrap();
1917
1918        let table = Arc::new(table);
1919        let engine = QueryEngine::new();
1920
1921        println!("\n=== Testing Numeric Type Coercion ===");
1922        println!("Table has {} rows", table.row_count());
1923        for i in 0..table.row_count() {
1924            let price = table.get_value(i, 1);
1925            let quantity = table.get_value(i, 2);
1926            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1927        }
1928
1929        // Test Contains on float values - should find rows with decimal points
1930        println!("\n--- Test: price.Contains('.') ---");
1931        let result = engine.execute(
1932            table.clone(),
1933            "SELECT * FROM test WHERE price.Contains('.')",
1934        );
1935        match result {
1936            Ok(view) => {
1937                println!(
1938                    "SUCCESS: Found {} rows with decimal points in price",
1939                    view.row_count()
1940                );
1941                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
1942            }
1943            Err(e) => {
1944                panic!("Numeric Contains query failed: {e}");
1945            }
1946        }
1947
1948        // Test Contains on integer values converted to string
1949        println!("\n--- Test: quantity.Contains('0') ---");
1950        let result = engine.execute(
1951            table.clone(),
1952            "SELECT * FROM test WHERE quantity.Contains('0')",
1953        );
1954        match result {
1955            Ok(view) => {
1956                println!(
1957                    "SUCCESS: Found {} rows with '0' in quantity",
1958                    view.row_count()
1959                );
1960                assert_eq!(view.row_count(), 2); // Should find 100 and 200
1961            }
1962            Err(e) => {
1963                panic!("Integer Contains query failed: {e}");
1964            }
1965        }
1966
1967        println!("\n=== Numeric type coercion test complete! ===");
1968    }
1969
1970    #[test]
1971    fn test_datetime_comparisons() {
1972        // Initialize tracing for debug output
1973        let _ = tracing_subscriber::fmt()
1974            .with_max_level(tracing::Level::DEBUG)
1975            .try_init();
1976
1977        let mut table = DataTable::new("test");
1978        table.add_column(DataColumn::new("id"));
1979        table.add_column(DataColumn::new("created_date"));
1980
1981        // Add test data with date strings (as they would come from CSV)
1982        table
1983            .add_row(DataRow::new(vec![
1984                DataValue::Integer(1),
1985                DataValue::String("2024-12-15".to_string()),
1986            ]))
1987            .unwrap();
1988
1989        table
1990            .add_row(DataRow::new(vec![
1991                DataValue::Integer(2),
1992                DataValue::String("2025-01-15".to_string()),
1993            ]))
1994            .unwrap();
1995
1996        table
1997            .add_row(DataRow::new(vec![
1998                DataValue::Integer(3),
1999                DataValue::String("2025-02-15".to_string()),
2000            ]))
2001            .unwrap();
2002
2003        let table = Arc::new(table);
2004        let engine = QueryEngine::new();
2005
2006        println!("\n=== Testing DateTime Comparisons ===");
2007        println!("Table has {} rows", table.row_count());
2008        for i in 0..table.row_count() {
2009            let date = table.get_value(i, 1);
2010            println!("Row {i}: created_date = {date:?}");
2011        }
2012
2013        // Test DateTime constructor comparison - should find dates after 2025-01-01
2014        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2015        let result = engine.execute(
2016            table.clone(),
2017            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2018        );
2019        match result {
2020            Ok(view) => {
2021                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2022                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
2023            }
2024            Err(e) => {
2025                panic!("DateTime comparison query failed: {e}");
2026            }
2027        }
2028
2029        println!("\n=== DateTime comparison test complete! ===");
2030    }
2031
2032    #[test]
2033    fn test_not_with_method_calls() {
2034        // Initialize tracing for debug output
2035        let _ = tracing_subscriber::fmt()
2036            .with_max_level(tracing::Level::DEBUG)
2037            .try_init();
2038
2039        let mut table = DataTable::new("test");
2040        table.add_column(DataColumn::new("id"));
2041        table.add_column(DataColumn::new("status"));
2042
2043        // Add test data
2044        table
2045            .add_row(DataRow::new(vec![
2046                DataValue::Integer(1),
2047                DataValue::String("Pending Review".to_string()),
2048            ]))
2049            .unwrap();
2050
2051        table
2052            .add_row(DataRow::new(vec![
2053                DataValue::Integer(2),
2054                DataValue::String("Complete".to_string()),
2055            ]))
2056            .unwrap();
2057
2058        table
2059            .add_row(DataRow::new(vec![
2060                DataValue::Integer(3),
2061                DataValue::String("Pending Approval".to_string()),
2062            ]))
2063            .unwrap();
2064
2065        let table = Arc::new(table);
2066        let engine = QueryEngine::with_case_insensitive(true);
2067
2068        println!("\n=== Testing NOT with Method Calls ===");
2069        println!("Table has {} rows", table.row_count());
2070        for i in 0..table.row_count() {
2071            let status = table.get_value(i, 1);
2072            println!("Row {i}: status = {status:?}");
2073        }
2074
2075        // Test NOT with Contains - should exclude rows containing "pend"
2076        println!("\n--- Test: NOT status.Contains('pend') ---");
2077        let result = engine.execute(
2078            table.clone(),
2079            "SELECT * FROM test WHERE NOT status.Contains('pend')",
2080        );
2081        match result {
2082            Ok(view) => {
2083                println!(
2084                    "SUCCESS: Found {} rows NOT containing 'pend'",
2085                    view.row_count()
2086                );
2087                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2088            }
2089            Err(e) => {
2090                panic!("NOT Contains query failed: {e}");
2091            }
2092        }
2093
2094        // Test NOT with StartsWith
2095        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2096        let result = engine.execute(
2097            table.clone(),
2098            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2099        );
2100        match result {
2101            Ok(view) => {
2102                println!(
2103                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2104                    view.row_count()
2105                );
2106                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2107            }
2108            Err(e) => {
2109                panic!("NOT StartsWith query failed: {e}");
2110            }
2111        }
2112
2113        println!("\n=== NOT with method calls test complete! ===");
2114    }
2115
2116    #[test]
2117    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2118    fn test_complex_logical_expressions() {
2119        // Initialize tracing for debug output
2120        let _ = tracing_subscriber::fmt()
2121            .with_max_level(tracing::Level::DEBUG)
2122            .try_init();
2123
2124        let mut table = DataTable::new("test");
2125        table.add_column(DataColumn::new("id"));
2126        table.add_column(DataColumn::new("status"));
2127        table.add_column(DataColumn::new("priority"));
2128        table.add_column(DataColumn::new("assigned"));
2129
2130        // Add comprehensive test data
2131        table
2132            .add_row(DataRow::new(vec![
2133                DataValue::Integer(1),
2134                DataValue::String("Pending".to_string()),
2135                DataValue::String("High".to_string()),
2136                DataValue::String("John".to_string()),
2137            ]))
2138            .unwrap();
2139
2140        table
2141            .add_row(DataRow::new(vec![
2142                DataValue::Integer(2),
2143                DataValue::String("Complete".to_string()),
2144                DataValue::String("High".to_string()),
2145                DataValue::String("Jane".to_string()),
2146            ]))
2147            .unwrap();
2148
2149        table
2150            .add_row(DataRow::new(vec![
2151                DataValue::Integer(3),
2152                DataValue::String("Pending".to_string()),
2153                DataValue::String("Low".to_string()),
2154                DataValue::String("John".to_string()),
2155            ]))
2156            .unwrap();
2157
2158        table
2159            .add_row(DataRow::new(vec![
2160                DataValue::Integer(4),
2161                DataValue::String("In Progress".to_string()),
2162                DataValue::String("Medium".to_string()),
2163                DataValue::String("Jane".to_string()),
2164            ]))
2165            .unwrap();
2166
2167        let table = Arc::new(table);
2168        let engine = QueryEngine::new();
2169
2170        println!("\n=== Testing Complex Logical Expressions ===");
2171        println!("Table has {} rows", table.row_count());
2172        for i in 0..table.row_count() {
2173            let status = table.get_value(i, 1);
2174            let priority = table.get_value(i, 2);
2175            let assigned = table.get_value(i, 3);
2176            println!(
2177                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2178            );
2179        }
2180
2181        // Test complex AND/OR logic
2182        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2183        let result = engine.execute(
2184            table.clone(),
2185            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2186        );
2187        match result {
2188            Ok(view) => {
2189                println!(
2190                    "SUCCESS: Found {} rows with complex logic",
2191                    view.row_count()
2192                );
2193                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2194            }
2195            Err(e) => {
2196                panic!("Complex logic query failed: {e}");
2197            }
2198        }
2199
2200        // Test NOT with complex expressions
2201        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2202        let result = engine.execute(
2203            table.clone(),
2204            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2205        );
2206        match result {
2207            Ok(view) => {
2208                println!(
2209                    "SUCCESS: Found {} rows with NOT complex logic",
2210                    view.row_count()
2211                );
2212                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2213            }
2214            Err(e) => {
2215                panic!("NOT complex logic query failed: {e}");
2216            }
2217        }
2218
2219        println!("\n=== Complex logical expressions test complete! ===");
2220    }
2221
2222    #[test]
2223    fn test_mixed_data_types_and_edge_cases() {
2224        // Initialize tracing for debug output
2225        let _ = tracing_subscriber::fmt()
2226            .with_max_level(tracing::Level::DEBUG)
2227            .try_init();
2228
2229        let mut table = DataTable::new("test");
2230        table.add_column(DataColumn::new("id"));
2231        table.add_column(DataColumn::new("value"));
2232        table.add_column(DataColumn::new("nullable_field"));
2233
2234        // Add test data with mixed types and edge cases
2235        table
2236            .add_row(DataRow::new(vec![
2237                DataValue::Integer(1),
2238                DataValue::String("123.45".to_string()),
2239                DataValue::String("present".to_string()),
2240            ]))
2241            .unwrap();
2242
2243        table
2244            .add_row(DataRow::new(vec![
2245                DataValue::Integer(2),
2246                DataValue::Float(678.90),
2247                DataValue::Null,
2248            ]))
2249            .unwrap();
2250
2251        table
2252            .add_row(DataRow::new(vec![
2253                DataValue::Integer(3),
2254                DataValue::Boolean(true),
2255                DataValue::String("also present".to_string()),
2256            ]))
2257            .unwrap();
2258
2259        table
2260            .add_row(DataRow::new(vec![
2261                DataValue::Integer(4),
2262                DataValue::String("false".to_string()),
2263                DataValue::Null,
2264            ]))
2265            .unwrap();
2266
2267        let table = Arc::new(table);
2268        let engine = QueryEngine::new();
2269
2270        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2271        println!("Table has {} rows", table.row_count());
2272        for i in 0..table.row_count() {
2273            let value = table.get_value(i, 1);
2274            let nullable = table.get_value(i, 2);
2275            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2276        }
2277
2278        // Test type coercion with boolean Contains
2279        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2280        let result = engine.execute(
2281            table.clone(),
2282            "SELECT * FROM test WHERE value.Contains('true')",
2283        );
2284        match result {
2285            Ok(view) => {
2286                println!(
2287                    "SUCCESS: Found {} rows with boolean coercion",
2288                    view.row_count()
2289                );
2290                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2291            }
2292            Err(e) => {
2293                panic!("Boolean coercion query failed: {e}");
2294            }
2295        }
2296
2297        // Test multiple IN values with mixed types
2298        println!("\n--- Test: id IN (1, 3) ---");
2299        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2300        match result {
2301            Ok(view) => {
2302                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2303                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2304            }
2305            Err(e) => {
2306                panic!("Multiple IN values query failed: {e}");
2307            }
2308        }
2309
2310        println!("\n=== Mixed data types test complete! ===");
2311    }
2312
2313    /// Test that aggregate-only queries return exactly one row (regression test)
2314    #[test]
2315    fn test_aggregate_only_single_row() {
2316        let table = create_test_stock_data();
2317        let engine = QueryEngine::new();
2318
2319        // Test query with multiple aggregates - should return exactly 1 row
2320        let result = engine
2321            .execute(
2322                table.clone(),
2323                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2324            )
2325            .expect("Query should succeed");
2326
2327        assert_eq!(
2328            result.row_count(),
2329            1,
2330            "Aggregate-only query should return exactly 1 row"
2331        );
2332        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2333
2334        // Verify the actual values are correct
2335        let source = result.source();
2336        let row = source.get_row(0).expect("Should have first row");
2337
2338        // COUNT(*) should be 5 (total rows)
2339        assert_eq!(row.values[0], DataValue::Integer(5));
2340
2341        // MIN should be 99.5
2342        assert_eq!(row.values[1], DataValue::Float(99.5));
2343
2344        // MAX should be 105.0
2345        assert_eq!(row.values[2], DataValue::Float(105.0));
2346
2347        // AVG should be approximately 102.4
2348        if let DataValue::Float(avg) = &row.values[3] {
2349            assert!(
2350                (avg - 102.4).abs() < 0.01,
2351                "Average should be approximately 102.4, got {}",
2352                avg
2353            );
2354        } else {
2355            panic!("AVG should return a Float value");
2356        }
2357    }
2358
2359    /// Test single aggregate function returns single row
2360    #[test]
2361    fn test_single_aggregate_single_row() {
2362        let table = create_test_stock_data();
2363        let engine = QueryEngine::new();
2364
2365        let result = engine
2366            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2367            .expect("Query should succeed");
2368
2369        assert_eq!(
2370            result.row_count(),
2371            1,
2372            "Single aggregate query should return exactly 1 row"
2373        );
2374        assert_eq!(result.column_count(), 1, "Should have 1 column");
2375
2376        let source = result.source();
2377        let row = source.get_row(0).expect("Should have first row");
2378        assert_eq!(row.values[0], DataValue::Integer(5));
2379    }
2380
2381    /// Test aggregate with WHERE clause filtering
2382    #[test]
2383    fn test_aggregate_with_where_single_row() {
2384        let table = create_test_stock_data();
2385        let engine = QueryEngine::new();
2386
2387        // Filter to only high-value stocks (>= 103.0) and aggregate
2388        let result = engine
2389            .execute(
2390                table.clone(),
2391                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2392            )
2393            .expect("Query should succeed");
2394
2395        assert_eq!(
2396            result.row_count(),
2397            1,
2398            "Filtered aggregate query should return exactly 1 row"
2399        );
2400        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2401
2402        let source = result.source();
2403        let row = source.get_row(0).expect("Should have first row");
2404
2405        // Should find 2 rows (103.5 and 105.0)
2406        assert_eq!(row.values[0], DataValue::Integer(2));
2407        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2408        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2409    }
2410
2411    #[test]
2412    fn test_not_in_parsing() {
2413        use crate::sql::recursive_parser::Parser;
2414
2415        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2416        println!("\n=== Testing NOT IN parsing ===");
2417        println!("Parsing query: {query}");
2418
2419        let mut parser = Parser::new(query);
2420        match parser.parse() {
2421            Ok(statement) => {
2422                println!("Parsed statement: {statement:#?}");
2423                if let Some(where_clause) = statement.where_clause {
2424                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2425                    if let Some(first_condition) = where_clause.conditions.first() {
2426                        println!("First condition expression: {:#?}", first_condition.expr);
2427                    }
2428                }
2429            }
2430            Err(e) => {
2431                panic!("Parse error: {e}");
2432            }
2433        }
2434    }
2435
2436    /// Create test stock data for aggregate testing
2437    fn create_test_stock_data() -> Arc<DataTable> {
2438        let mut table = DataTable::new("stock");
2439
2440        table.add_column(DataColumn::new("symbol"));
2441        table.add_column(DataColumn::new("close"));
2442        table.add_column(DataColumn::new("volume"));
2443
2444        // Add 5 rows of test data
2445        let test_data = vec![
2446            ("AAPL", 99.5, 1000),
2447            ("AAPL", 101.2, 1500),
2448            ("AAPL", 103.5, 2000),
2449            ("AAPL", 105.0, 1200),
2450            ("AAPL", 102.8, 1800),
2451        ];
2452
2453        for (symbol, close, volume) in test_data {
2454            table
2455                .add_row(DataRow::new(vec![
2456                    DataValue::String(symbol.to_string()),
2457                    DataValue::Float(close),
2458                    DataValue::Integer(volume),
2459                ]))
2460                .expect("Should add row successfully");
2461        }
2462
2463        Arc::new(table)
2464    }
2465}
2466
2467#[cfg(test)]
2468#[path = "query_engine_tests.rs"]
2469mod query_engine_tests;