sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26    TableFunction,
27};
28
29/// Query engine that executes SQL directly on `DataTable`
30#[derive(Clone)]
31pub struct QueryEngine {
32    case_insensitive: bool,
33    date_notation: String,
34    behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl QueryEngine {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            case_insensitive: false,
48            date_notation: get_date_notation(),
49            behavior_config: None,
50        }
51    }
52
53    #[must_use]
54    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55        let case_insensitive = config.case_insensitive_default;
56        // Use get_date_notation() to respect environment variable override
57        let date_notation = get_date_notation();
58        Self {
59            case_insensitive,
60            date_notation,
61            behavior_config: Some(config),
62        }
63    }
64
65    #[must_use]
66    pub fn with_date_notation(_date_notation: String) -> Self {
67        Self {
68            case_insensitive: false,
69            date_notation: get_date_notation(), // Always use the global function
70            behavior_config: None,
71        }
72    }
73
74    #[must_use]
75    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76        Self {
77            case_insensitive,
78            date_notation: get_date_notation(),
79            behavior_config: None,
80        }
81    }
82
83    #[must_use]
84    pub fn with_case_insensitive_and_date_notation(
85        case_insensitive: bool,
86        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
87    ) -> Self {
88        Self {
89            case_insensitive,
90            date_notation: get_date_notation(), // Always use the global function
91            behavior_config: None,
92        }
93    }
94
95    /// Find a column name similar to the given name using edit distance
96    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97        let columns = table.column_names();
98        let mut best_match: Option<(String, usize)> = None;
99
100        for col in columns {
101            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102            // Only suggest if distance is small (likely a typo)
103            // Allow up to 3 edits for longer names
104            let max_distance = if name.len() > 10 { 3 } else { 2 };
105            if distance <= max_distance {
106                match &best_match {
107                    None => best_match = Some((col, distance)),
108                    Some((_, best_dist)) if distance < *best_dist => {
109                        best_match = Some((col, distance));
110                    }
111                    _ => {}
112                }
113            }
114        }
115
116        best_match.map(|(name, _)| name)
117    }
118
119    /// Calculate Levenshtein edit distance between two strings
120    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121        let len1 = s1.len();
122        let len2 = s2.len();
123        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125        for i in 0..=len1 {
126            matrix[i][0] = i;
127        }
128        for j in 0..=len2 {
129            matrix[0][j] = j;
130        }
131
132        for (i, c1) in s1.chars().enumerate() {
133            for (j, c2) in s2.chars().enumerate() {
134                let cost = usize::from(c1 != c2);
135                matrix[i + 1][j + 1] = std::cmp::min(
136                    matrix[i][j + 1] + 1, // deletion
137                    std::cmp::min(
138                        matrix[i + 1][j] + 1, // insertion
139                        matrix[i][j] + cost,  // substitution
140                    ),
141                );
142            }
143        }
144
145        matrix[len1][len2]
146    }
147
148    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
149    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150        let (view, _plan) = self.execute_with_plan(table, sql)?;
151        Ok(view)
152    }
153
154    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
155    pub fn execute_statement(
156        &self,
157        table: Arc<DataTable>,
158        statement: SelectStatement,
159    ) -> Result<DataView> {
160        // First process CTEs to build context
161        let mut cte_context = HashMap::new();
162        for cte in &statement.ctes {
163            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164            // Execute the CTE based on its type
165            let cte_result = match &cte.cte_type {
166                CTEType::Standard(query) => {
167                    // Execute the CTE query (it might reference earlier CTEs)
168                    self.build_view_with_context(table.clone(), query.clone(), &mut cte_context)?
169                }
170                CTEType::Web(web_spec) => {
171                    // Fetch data from URL
172                    use crate::web::http_fetcher::WebDataFetcher;
173
174                    let fetcher = WebDataFetcher::new()?;
175                    let data_table = fetcher.fetch(web_spec, &cte.name)?;
176
177                    // Convert DataTable to DataView
178                    DataView::new(Arc::new(data_table))
179                }
180            };
181            // Store the result in the context for later use
182            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
183            debug!(
184                "QueryEngine: CTE '{}' pre-processed, stored in context",
185                cte.name
186            );
187        }
188
189        // Now process subqueries with CTE context available
190        let mut subquery_executor =
191            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
192        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
193
194        // Build the view with the same CTE context
195        self.build_view_with_context(table, processed_statement, &mut cte_context)
196    }
197
198    /// Execute a statement with provided CTE context (for subqueries)
199    pub fn execute_statement_with_cte_context(
200        &self,
201        table: Arc<DataTable>,
202        statement: SelectStatement,
203        cte_context: &HashMap<String, Arc<DataView>>,
204    ) -> Result<DataView> {
205        // Clone the context so we can add any CTEs from this statement
206        let mut local_context = cte_context.clone();
207
208        // Process any CTEs in this statement (they might be nested)
209        for cte in &statement.ctes {
210            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
211            let cte_result = match &cte.cte_type {
212                CTEType::Standard(query) => {
213                    self.build_view_with_context(table.clone(), query.clone(), &mut local_context)?
214                }
215                CTEType::Web(web_spec) => {
216                    // Fetch data from URL
217                    use crate::web::http_fetcher::WebDataFetcher;
218
219                    let fetcher = WebDataFetcher::new()?;
220                    let data_table = fetcher.fetch(web_spec, &cte.name)?;
221
222                    // Convert DataTable to DataView
223                    DataView::new(Arc::new(data_table))
224                }
225            };
226            local_context.insert(cte.name.clone(), Arc::new(cte_result));
227        }
228
229        // Process subqueries with the complete context
230        let mut subquery_executor =
231            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
232        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
233
234        // Build the view
235        self.build_view_with_context(table, processed_statement, &mut local_context)
236    }
237
238    /// Execute a query and return both the result and the execution plan
239    pub fn execute_with_plan(
240        &self,
241        table: Arc<DataTable>,
242        sql: &str,
243    ) -> Result<(DataView, ExecutionPlan)> {
244        let mut plan_builder = ExecutionPlanBuilder::new();
245        let start_time = Instant::now();
246
247        // Parse the SQL query
248        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
249        plan_builder.add_detail(format!("Query: {}", sql));
250        let mut parser = Parser::new(sql);
251        let statement = parser
252            .parse()
253            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
254        plan_builder.add_detail(format!("Parsed successfully"));
255        if let Some(from) = &statement.from_table {
256            plan_builder.add_detail(format!("FROM: {}", from));
257        }
258        if statement.where_clause.is_some() {
259            plan_builder.add_detail("WHERE clause present".to_string());
260        }
261        plan_builder.end_step();
262
263        // First process CTEs to build context
264        let mut cte_context = HashMap::new();
265
266        if !statement.ctes.is_empty() {
267            plan_builder.begin_step(
268                StepType::CTE,
269                format!("Process {} CTEs", statement.ctes.len()),
270            );
271
272            for cte in &statement.ctes {
273                let cte_start = Instant::now();
274                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
275
276                let cte_result = match &cte.cte_type {
277                    CTEType::Standard(query) => {
278                        // Add CTE query details
279                        if let Some(from) = &query.from_table {
280                            plan_builder.add_detail(format!("Source: {}", from));
281                        }
282                        if query.where_clause.is_some() {
283                            plan_builder.add_detail("Has WHERE clause".to_string());
284                        }
285                        if query.group_by.is_some() {
286                            plan_builder.add_detail("Has GROUP BY".to_string());
287                        }
288
289                        debug!(
290                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
291                            cte.name,
292                            cte_context.keys().collect::<Vec<_>>()
293                        );
294
295                        // Process subqueries in the CTE's query FIRST
296                        // This allows the subqueries to see all previously defined CTEs
297                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
298                            self.clone(),
299                            table.clone(),
300                            cte_context.clone(),
301                        );
302                        let processed_query = subquery_executor.execute_subqueries(query)?;
303
304                        self.build_view_with_context(
305                            table.clone(),
306                            processed_query,
307                            &mut cte_context,
308                        )?
309                    }
310                    CTEType::Web(web_spec) => {
311                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
312                        if let Some(format) = &web_spec.format {
313                            plan_builder.add_detail(format!("Format: {:?}", format));
314                        }
315                        if let Some(cache) = web_spec.cache_seconds {
316                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
317                        }
318
319                        // Fetch data from URL
320                        use crate::web::http_fetcher::WebDataFetcher;
321
322                        let fetcher = WebDataFetcher::new()?;
323                        let data_table = fetcher.fetch(web_spec, &cte.name)?;
324
325                        // Convert DataTable to DataView
326                        DataView::new(Arc::new(data_table))
327                    }
328                };
329
330                // Record CTE statistics
331                plan_builder.set_rows_out(cte_result.row_count());
332                plan_builder.add_detail(format!(
333                    "Result: {} rows, {} columns",
334                    cte_result.row_count(),
335                    cte_result.column_count()
336                ));
337                plan_builder.add_detail(format!(
338                    "Execution time: {:.3}ms",
339                    cte_start.elapsed().as_secs_f64() * 1000.0
340                ));
341
342                debug!(
343                    "QueryEngine: Storing CTE '{}' in context with {} rows",
344                    cte.name,
345                    cte_result.row_count()
346                );
347                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
348                plan_builder.end_step();
349            }
350
351            plan_builder.add_detail(format!(
352                "All {} CTEs cached in context",
353                statement.ctes.len()
354            ));
355            plan_builder.end_step();
356        }
357
358        // Process subqueries in the statement with CTE context
359        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
360        let mut subquery_executor =
361            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
362
363        // Check if there are subqueries to process
364        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
365            // This is a simplified check - in reality we'd need to walk the AST
366            format!("{:?}", w).contains("Subquery")
367        });
368
369        if has_subqueries {
370            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
371        }
372
373        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
374
375        if has_subqueries {
376            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
377        } else {
378            plan_builder.add_detail("No subqueries to process".to_string());
379        }
380
381        plan_builder.end_step();
382        let result = self.build_view_with_context_and_plan(
383            table,
384            processed_statement,
385            &mut cte_context,
386            &mut plan_builder,
387        )?;
388
389        let total_duration = start_time.elapsed();
390        info!(
391            "Query execution complete: total={:?}, rows={}",
392            total_duration,
393            result.row_count()
394        );
395
396        let plan = plan_builder.build();
397        Ok((result, plan))
398    }
399
400    /// Build a `DataView` from a parsed SQL statement
401    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
402        let mut cte_context = HashMap::new();
403        self.build_view_with_context(table, statement, &mut cte_context)
404    }
405
406    /// Build a DataView from a SelectStatement with CTE context
407    fn build_view_with_context(
408        &self,
409        table: Arc<DataTable>,
410        statement: SelectStatement,
411        cte_context: &mut HashMap<String, Arc<DataView>>,
412    ) -> Result<DataView> {
413        let mut dummy_plan = ExecutionPlanBuilder::new();
414        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
415    }
416
417    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
418    fn build_view_with_context_and_plan(
419        &self,
420        table: Arc<DataTable>,
421        statement: SelectStatement,
422        cte_context: &mut HashMap<String, Arc<DataView>>,
423        plan: &mut ExecutionPlanBuilder,
424    ) -> Result<DataView> {
425        // First, process any CTEs that aren't already in the context
426        for cte in &statement.ctes {
427            // Skip if already processed (e.g., by execute_select for WEB CTEs)
428            if cte_context.contains_key(&cte.name) {
429                debug!(
430                    "QueryEngine: CTE '{}' already in context, skipping",
431                    cte.name
432                );
433                continue;
434            }
435
436            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
437            debug!(
438                "QueryEngine: Available CTEs for '{}': {:?}",
439                cte.name,
440                cte_context.keys().collect::<Vec<_>>()
441            );
442
443            // Execute the CTE query (it might reference earlier CTEs)
444            let cte_result = match &cte.cte_type {
445                CTEType::Standard(query) => {
446                    self.build_view_with_context(table.clone(), query.clone(), cte_context)?
447                }
448                CTEType::Web(_web_spec) => {
449                    // Web CTEs should have been processed earlier in execute_select
450                    return Err(anyhow!(
451                        "Web CTEs should be processed in execute_select method"
452                    ));
453                }
454            };
455
456            // Store the result in the context for later use
457            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
458            debug!(
459                "QueryEngine: CTE '{}' processed, stored in context",
460                cte.name
461            );
462        }
463
464        // Determine the source table for the main query
465        let source_table = if let Some(ref table_func) = statement.from_function {
466            // Handle table functions like RANGE()
467            debug!("QueryEngine: Processing table function...");
468            match table_func {
469                TableFunction::Range { start, end, step } => {
470                    // Evaluate expressions to get numeric values
471                    let mut evaluator =
472                        ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
473
474                    // Create a dummy row for evaluating constant expressions
475                    let dummy_row = 0;
476
477                    let start_val = evaluator.evaluate(start, dummy_row)?;
478                    let end_val = evaluator.evaluate(end, dummy_row)?;
479                    let step_val = if let Some(step_expr) = step {
480                        Some(evaluator.evaluate(step_expr, dummy_row)?)
481                    } else {
482                        None
483                    };
484
485                    // Convert DataValues to integers
486                    let start_int = match start_val {
487                        DataValue::Integer(i) => i,
488                        DataValue::Float(f) => f as i64,
489                        _ => return Err(anyhow!("RANGE start must be numeric")),
490                    };
491
492                    let end_int = match end_val {
493                        DataValue::Integer(i) => i,
494                        DataValue::Float(f) => f as i64,
495                        _ => return Err(anyhow!("RANGE end must be numeric")),
496                    };
497
498                    let step_int = if let Some(step) = step_val {
499                        match step {
500                            DataValue::Integer(i) => Some(i),
501                            DataValue::Float(f) => Some(f as i64),
502                            _ => return Err(anyhow!("RANGE step must be numeric")),
503                        }
504                    } else {
505                        None
506                    };
507
508                    // Generate the virtual table
509                    VirtualTableGenerator::generate_range(start_int, end_int, step_int, None)?
510                }
511                TableFunction::Split { text, delimiter } => {
512                    // Evaluate expressions to get string values
513                    let mut evaluator =
514                        ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
515
516                    // Create a dummy row for evaluating constant expressions
517                    let dummy_row = 0;
518
519                    let text_val = evaluator.evaluate(text, dummy_row)?;
520                    let delimiter_val = if let Some(delim_expr) = delimiter {
521                        Some(evaluator.evaluate(delim_expr, dummy_row)?)
522                    } else {
523                        None
524                    };
525
526                    // Convert DataValues to strings
527                    let text_str = match text_val {
528                        DataValue::String(s) => s,
529                        DataValue::Null => return Err(anyhow!("SPLIT text cannot be NULL")),
530                        _ => text_val.to_string(),
531                    };
532
533                    let delimiter_str = if let Some(delim) = delimiter_val {
534                        match delim {
535                            DataValue::String(s) => Some(s),
536                            DataValue::Null => None,
537                            _ => Some(delim.to_string()),
538                        }
539                    } else {
540                        None
541                    };
542
543                    // Generate the virtual table from split string
544                    VirtualTableGenerator::generate_split(
545                        &text_str,
546                        delimiter_str.as_deref(),
547                        None,
548                    )?
549                }
550                TableFunction::Generator { name, args } => {
551                    // Use the generator registry to create the table
552                    use crate::sql::generators::GeneratorRegistry;
553
554                    // Create generator registry (could be cached in QueryEngine)
555                    let registry = GeneratorRegistry::new();
556
557                    if let Some(generator) = registry.get(name) {
558                        // Evaluate arguments
559                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
560                            &table,
561                            self.date_notation.clone(),
562                        );
563                        let dummy_row = 0;
564
565                        let mut evaluated_args = Vec::new();
566                        for arg in args {
567                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
568                        }
569
570                        // Generate the table
571                        generator.generate(evaluated_args)?
572                    } else {
573                        return Err(anyhow!("Unknown generator function: {}", name));
574                    }
575                }
576            }
577        } else if let Some(ref subquery) = statement.from_subquery {
578            // Execute the subquery and use its result as the source
579            debug!("QueryEngine: Processing FROM subquery...");
580            let subquery_result =
581                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
582
583            // Convert the DataView to a DataTable for use as source
584            // This materializes the subquery result
585            let materialized = self.materialize_view(subquery_result)?;
586            Arc::new(materialized)
587        } else if let Some(ref table_name) = statement.from_table {
588            // Check if this references a CTE
589            if let Some(cte_view) = cte_context.get(table_name) {
590                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
591                // Materialize the CTE view as a table
592                let materialized = self.materialize_view((**cte_view).clone())?;
593                Arc::new(materialized)
594            } else {
595                // Regular table reference - use the provided table
596                table.clone()
597            }
598        } else {
599            // No FROM clause - use the provided table
600            table.clone()
601        };
602
603        // Process JOINs if present
604        let final_table = if !statement.joins.is_empty() {
605            plan.begin_step(
606                StepType::Join,
607                format!("Process {} JOINs", statement.joins.len()),
608            );
609            plan.set_rows_in(source_table.row_count());
610
611            let join_executor = HashJoinExecutor::new(self.case_insensitive);
612            let mut current_table = source_table;
613
614            for (idx, join_clause) in statement.joins.iter().enumerate() {
615                let join_start = Instant::now();
616                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
617                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
618                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
619                plan.add_detail(format!(
620                    "Executing {:?} JOIN on {}",
621                    join_clause.join_type, join_clause.condition.left_column
622                ));
623
624                // Resolve the right table for the join
625                let right_table = match &join_clause.table {
626                    TableSource::Table(name) => {
627                        // Check if it's a CTE reference
628                        if let Some(cte_view) = cte_context.get(name) {
629                            let materialized = self.materialize_view((**cte_view).clone())?;
630                            Arc::new(materialized)
631                        } else {
632                            // For now, we need the actual table data
633                            // In a real implementation, this would load from file
634                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
635                        }
636                    }
637                    TableSource::DerivedTable { query, alias: _ } => {
638                        // Execute the subquery
639                        let subquery_result = self.build_view_with_context(
640                            table.clone(),
641                            *query.clone(),
642                            cte_context,
643                        )?;
644                        let materialized = self.materialize_view(subquery_result)?;
645                        Arc::new(materialized)
646                    }
647                };
648
649                // Execute the join
650                let joined = join_executor.execute_join(
651                    current_table.clone(),
652                    join_clause,
653                    right_table.clone(),
654                )?;
655
656                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
657                plan.set_rows_out(joined.row_count());
658                plan.add_detail(format!("Result: {} rows", joined.row_count()));
659                plan.add_detail(format!(
660                    "Join time: {:.3}ms",
661                    join_start.elapsed().as_secs_f64() * 1000.0
662                ));
663                plan.end_step();
664
665                current_table = Arc::new(joined);
666            }
667
668            plan.set_rows_out(current_table.row_count());
669            plan.add_detail(format!(
670                "Final result after all joins: {} rows",
671                current_table.row_count()
672            ));
673            plan.end_step();
674            current_table
675        } else {
676            source_table
677        };
678
679        // Continue with the existing build_view logic but using final_table
680        self.build_view_internal_with_plan(final_table, statement, plan)
681    }
682
683    /// Materialize a DataView into a new DataTable
684    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
685        let source = view.source();
686        let mut result_table = DataTable::new("derived");
687
688        // Get the visible columns from the view
689        let visible_cols = view.visible_column_indices().to_vec();
690
691        // Copy column definitions
692        for col_idx in &visible_cols {
693            let col = &source.columns[*col_idx];
694            let new_col = DataColumn {
695                name: col.name.clone(),
696                data_type: col.data_type.clone(),
697                nullable: col.nullable,
698                unique_values: col.unique_values,
699                null_count: col.null_count,
700                metadata: col.metadata.clone(),
701            };
702            result_table.add_column(new_col);
703        }
704
705        // Copy visible rows
706        for row_idx in view.visible_row_indices() {
707            let source_row = &source.rows[*row_idx];
708            let mut new_row = DataRow { values: Vec::new() };
709
710            for col_idx in &visible_cols {
711                new_row.values.push(source_row.values[*col_idx].clone());
712            }
713
714            result_table.add_row(new_row);
715        }
716
717        Ok(result_table)
718    }
719
720    fn build_view_internal(
721        &self,
722        table: Arc<DataTable>,
723        statement: SelectStatement,
724    ) -> Result<DataView> {
725        let mut dummy_plan = ExecutionPlanBuilder::new();
726        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
727    }
728
729    fn build_view_internal_with_plan(
730        &self,
731        table: Arc<DataTable>,
732        statement: SelectStatement,
733        plan: &mut ExecutionPlanBuilder,
734    ) -> Result<DataView> {
735        debug!(
736            "QueryEngine::build_view - select_items: {:?}",
737            statement.select_items
738        );
739        debug!(
740            "QueryEngine::build_view - where_clause: {:?}",
741            statement.where_clause
742        );
743
744        // Start with all rows visible
745        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
746
747        // Apply WHERE clause filtering using recursive evaluator
748        if let Some(where_clause) = &statement.where_clause {
749            let total_rows = table.row_count();
750            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
751            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
752
753            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
754            plan.set_rows_in(total_rows);
755            plan.add_detail(format!("Input: {} rows", total_rows));
756
757            // Add details about WHERE conditions
758            for condition in &where_clause.conditions {
759                plan.add_detail(format!("Condition: {:?}", condition.expr));
760            }
761
762            let filter_start = Instant::now();
763            // Create an evaluation context for caching compiled regexes
764            let mut eval_context = EvaluationContext::new(self.case_insensitive);
765
766            // Filter visible rows based on WHERE clause
767            let mut filtered_rows = Vec::new();
768            for row_idx in visible_rows {
769                // Only log for first few rows to avoid performance impact
770                if row_idx < 3 {
771                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
772                }
773                let mut evaluator =
774                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
775                match evaluator.evaluate(where_clause, row_idx) {
776                    Ok(result) => {
777                        if row_idx < 3 {
778                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
779                        }
780                        if result {
781                            filtered_rows.push(row_idx);
782                        }
783                    }
784                    Err(e) => {
785                        if row_idx < 3 {
786                            debug!(
787                                "QueryEngine: WHERE evaluation error for row {}: {}",
788                                row_idx, e
789                            );
790                        }
791                        // Propagate WHERE clause errors instead of silently ignoring them
792                        return Err(e);
793                    }
794                }
795            }
796
797            // Log regex cache statistics
798            let (compilations, cache_hits) = eval_context.get_stats();
799            if compilations > 0 || cache_hits > 0 {
800                debug!(
801                    "LIKE pattern cache: {} compilations, {} cache hits",
802                    compilations, cache_hits
803                );
804            }
805            visible_rows = filtered_rows;
806            let filter_duration = filter_start.elapsed();
807            info!(
808                "WHERE clause filtering: {} rows -> {} rows in {:?}",
809                total_rows,
810                visible_rows.len(),
811                filter_duration
812            );
813
814            plan.set_rows_out(visible_rows.len());
815            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
816            plan.add_detail(format!(
817                "Filter time: {:.3}ms",
818                filter_duration.as_secs_f64() * 1000.0
819            ));
820            plan.end_step();
821        }
822
823        // Create initial DataView with filtered rows
824        let mut view = DataView::new(table.clone());
825        view = view.with_rows(visible_rows);
826
827        // Handle GROUP BY if present
828        if let Some(group_by_exprs) = &statement.group_by {
829            if !group_by_exprs.is_empty() {
830                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
831
832                plan.begin_step(
833                    StepType::GroupBy,
834                    format!("GROUP BY {} expressions", group_by_exprs.len()),
835                );
836                plan.set_rows_in(view.row_count());
837                plan.add_detail(format!("Input: {} rows", view.row_count()));
838                for expr in group_by_exprs {
839                    plan.add_detail(format!("Group by: {:?}", expr));
840                }
841
842                let group_start = Instant::now();
843                view = self.apply_group_by(
844                    view,
845                    group_by_exprs,
846                    &statement.select_items,
847                    statement.having.as_ref(),
848                )?;
849
850                plan.set_rows_out(view.row_count());
851                plan.add_detail(format!("Output: {} groups", view.row_count()));
852                plan.add_detail(format!(
853                    "Group time: {:.3}ms",
854                    group_start.elapsed().as_secs_f64() * 1000.0
855                ));
856                plan.end_step();
857            }
858        } else {
859            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
860            if !statement.select_items.is_empty() {
861                // Check if we have ANY non-star items (not just the first one)
862                let has_non_star_items = statement
863                    .select_items
864                    .iter()
865                    .any(|item| !matches!(item, SelectItem::Star));
866
867                // Apply select items if:
868                // 1. We have computed expressions or explicit columns
869                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
870                if has_non_star_items || statement.select_items.len() > 1 {
871                    view = self.apply_select_items(view, &statement.select_items)?;
872                }
873                // If it's just a single star, no projection needed
874            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
875                // Fallback to legacy column projection for backward compatibility
876                let column_indices = self.resolve_column_indices(&table, &statement.columns)?;
877                view = view.with_columns(column_indices);
878            }
879        }
880
881        // Apply DISTINCT if specified
882        if statement.distinct {
883            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
884            plan.set_rows_in(view.row_count());
885            plan.add_detail(format!("Input: {} rows", view.row_count()));
886
887            let distinct_start = Instant::now();
888            view = self.apply_distinct(view)?;
889
890            plan.set_rows_out(view.row_count());
891            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
892            plan.add_detail(format!(
893                "Distinct time: {:.3}ms",
894                distinct_start.elapsed().as_secs_f64() * 1000.0
895            ));
896            plan.end_step();
897        }
898
899        // Apply ORDER BY sorting
900        if let Some(order_by_columns) = &statement.order_by {
901            if !order_by_columns.is_empty() {
902                plan.begin_step(
903                    StepType::Sort,
904                    format!("ORDER BY {} columns", order_by_columns.len()),
905                );
906                plan.set_rows_in(view.row_count());
907                for col in order_by_columns {
908                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
909                }
910
911                let sort_start = Instant::now();
912                view = self.apply_multi_order_by(view, order_by_columns)?;
913
914                plan.add_detail(format!(
915                    "Sort time: {:.3}ms",
916                    sort_start.elapsed().as_secs_f64() * 1000.0
917                ));
918                plan.end_step();
919            }
920        }
921
922        // Apply LIMIT/OFFSET
923        if let Some(limit) = statement.limit {
924            let offset = statement.offset.unwrap_or(0);
925            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
926            plan.set_rows_in(view.row_count());
927            if offset > 0 {
928                plan.add_detail(format!("OFFSET: {}", offset));
929            }
930            view = view.with_limit(limit, offset);
931            plan.set_rows_out(view.row_count());
932            plan.add_detail(format!("Output: {} rows", view.row_count()));
933            plan.end_step();
934        }
935
936        Ok(view)
937    }
938
939    /// Resolve column names to indices
940    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
941        let mut indices = Vec::new();
942        let table_columns = table.column_names();
943
944        for col_name in columns {
945            let index = table_columns
946                .iter()
947                .position(|c| c.eq_ignore_ascii_case(col_name))
948                .ok_or_else(|| {
949                    let suggestion = self.find_similar_column(table, col_name);
950                    match suggestion {
951                        Some(similar) => anyhow::anyhow!(
952                            "Column '{}' not found. Did you mean '{}'?",
953                            col_name,
954                            similar
955                        ),
956                        None => anyhow::anyhow!("Column '{}' not found", col_name),
957                    }
958                })?;
959            indices.push(index);
960        }
961
962        Ok(indices)
963    }
964
965    /// Apply SELECT items (columns and computed expressions) to create new view
966    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
967        debug!(
968            "QueryEngine::apply_select_items - items: {:?}",
969            select_items
970        );
971        debug!(
972            "QueryEngine::apply_select_items - input view has {} rows",
973            view.row_count()
974        );
975
976        // Check if this is an aggregate query:
977        // 1. At least one aggregate function exists
978        // 2. All other items are either aggregates or constants (aggregate-compatible)
979        let has_aggregates = select_items.iter().any(|item| match item {
980            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
981            SelectItem::Column(_) => false,
982            SelectItem::Star => false,
983        });
984
985        let all_aggregate_compatible = select_items.iter().all(|item| match item {
986            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
987            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
988            SelectItem::Star => false,      // Star is not aggregate-compatible
989        });
990
991        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
992            // Special handling for aggregate queries with constants (no GROUP BY)
993            // These should produce exactly one row
994            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
995            return self.apply_aggregate_select(view, select_items);
996        }
997
998        // Check if we need to create computed columns
999        let has_computed_expressions = select_items
1000            .iter()
1001            .any(|item| matches!(item, SelectItem::Expression { .. }));
1002
1003        debug!(
1004            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1005            has_computed_expressions
1006        );
1007
1008        if !has_computed_expressions {
1009            // Simple case: only columns, use existing projection logic
1010            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1011            return Ok(view.with_columns(column_indices));
1012        }
1013
1014        // Complex case: we have computed expressions
1015        // IMPORTANT: We create a PROJECTED view, not a new table
1016        // This preserves the original DataTable reference
1017
1018        let source_table = view.source();
1019        let visible_rows = view.visible_row_indices();
1020
1021        // Create a temporary table just for the computed result view
1022        // But this table is only used for the current query result
1023        let mut computed_table = DataTable::new("query_result");
1024
1025        // First, expand any Star selectors to actual columns
1026        let mut expanded_items = Vec::new();
1027        for item in select_items {
1028            match item {
1029                SelectItem::Star => {
1030                    // Expand * to all columns from source table
1031                    for col_name in source_table.column_names() {
1032                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1033                            col_name.to_string(),
1034                        )));
1035                    }
1036                }
1037                _ => expanded_items.push(item.clone()),
1038            }
1039        }
1040
1041        // Add columns based on expanded SelectItems, handling duplicates
1042        let mut column_name_counts: std::collections::HashMap<String, usize> =
1043            std::collections::HashMap::new();
1044
1045        for item in &expanded_items {
1046            let base_name = match item {
1047                SelectItem::Column(col_ref) => col_ref.name.clone(),
1048                SelectItem::Expression { alias, .. } => alias.clone(),
1049                SelectItem::Star => unreachable!("Star should have been expanded"),
1050            };
1051
1052            // Check if this column name has been used before
1053            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1054            let column_name = if *count == 0 {
1055                // First occurrence, use the name as-is
1056                base_name.clone()
1057            } else {
1058                // Duplicate, append a suffix
1059                format!("{base_name}_{count}")
1060            };
1061            *count += 1;
1062
1063            computed_table.add_column(DataColumn::new(&column_name));
1064        }
1065
1066        // Calculate values for each row
1067        let mut evaluator =
1068            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1069
1070        for &row_idx in visible_rows {
1071            let mut row_values = Vec::new();
1072
1073            for item in &expanded_items {
1074                let value = match item {
1075                    SelectItem::Column(col_ref) => {
1076                        // Simple column reference
1077                        let col_idx =
1078                            source_table
1079                                .get_column_index(&col_ref.name)
1080                                .ok_or_else(|| {
1081                                    let suggestion =
1082                                        self.find_similar_column(source_table, &col_ref.name);
1083                                    match suggestion {
1084                                        Some(similar) => anyhow::anyhow!(
1085                                            "Column '{}' not found. Did you mean '{}'?",
1086                                            col_ref.name,
1087                                            similar
1088                                        ),
1089                                        None => {
1090                                            anyhow::anyhow!("Column '{}' not found", col_ref.name)
1091                                        }
1092                                    }
1093                                })?;
1094                        let row = source_table
1095                            .get_row(row_idx)
1096                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1097                        row.get(col_idx)
1098                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1099                            .clone()
1100                    }
1101                    SelectItem::Expression { expr, .. } => {
1102                        // Computed expression
1103                        evaluator.evaluate(expr, row_idx)?
1104                    }
1105                    SelectItem::Star => unreachable!("Star should have been expanded"),
1106                };
1107                row_values.push(value);
1108            }
1109
1110            computed_table
1111                .add_row(DataRow::new(row_values))
1112                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1113        }
1114
1115        // Return a view of the computed result
1116        // This is a temporary view for this query only
1117        Ok(DataView::new(Arc::new(computed_table)))
1118    }
1119
1120    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1121    fn apply_aggregate_select(
1122        &self,
1123        view: DataView,
1124        select_items: &[SelectItem],
1125    ) -> Result<DataView> {
1126        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1127
1128        let source_table = view.source();
1129        let mut result_table = DataTable::new("aggregate_result");
1130
1131        // Add columns for each select item
1132        for item in select_items {
1133            let column_name = match item {
1134                SelectItem::Expression { alias, .. } => alias.clone(),
1135                _ => unreachable!("Should only have expressions in aggregate-only query"),
1136            };
1137            result_table.add_column(DataColumn::new(&column_name));
1138        }
1139
1140        // Create evaluator with visible rows from the view (for filtered aggregates)
1141        let visible_rows = view.visible_row_indices().to_vec();
1142        let mut evaluator =
1143            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1144                .with_visible_rows(visible_rows);
1145
1146        // Evaluate each aggregate expression once (they handle all rows internally)
1147        let mut row_values = Vec::new();
1148        for item in select_items {
1149            match item {
1150                SelectItem::Expression { expr, .. } => {
1151                    // The evaluator will handle aggregates over all rows
1152                    // We pass row_index=0 but aggregates ignore it and process all rows
1153                    let value = evaluator.evaluate(expr, 0)?;
1154                    row_values.push(value);
1155                }
1156                _ => unreachable!("Should only have expressions in aggregate-only query"),
1157            }
1158        }
1159
1160        // Add the single result row
1161        result_table
1162            .add_row(DataRow::new(row_values))
1163            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1164
1165        Ok(DataView::new(Arc::new(result_table)))
1166    }
1167
1168    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1169    fn resolve_select_columns(
1170        &self,
1171        table: &DataTable,
1172        select_items: &[SelectItem],
1173    ) -> Result<Vec<usize>> {
1174        let mut indices = Vec::new();
1175        let table_columns = table.column_names();
1176
1177        for item in select_items {
1178            match item {
1179                SelectItem::Column(col_ref) => {
1180                    let index = table_columns
1181                        .iter()
1182                        .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1183                        .ok_or_else(|| {
1184                            let suggestion = self.find_similar_column(table, &col_ref.name);
1185                            match suggestion {
1186                                Some(similar) => anyhow::anyhow!(
1187                                    "Column '{}' not found. Did you mean '{}'?",
1188                                    col_ref.name,
1189                                    similar
1190                                ),
1191                                None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1192                            }
1193                        })?;
1194                    indices.push(index);
1195                }
1196                SelectItem::Star => {
1197                    // Expand * to all column indices
1198                    for i in 0..table_columns.len() {
1199                        indices.push(i);
1200                    }
1201                }
1202                SelectItem::Expression { .. } => {
1203                    return Err(anyhow::anyhow!(
1204                        "Computed expressions require new table creation"
1205                    ));
1206                }
1207            }
1208        }
1209
1210        Ok(indices)
1211    }
1212
1213    /// Apply DISTINCT to remove duplicate rows
1214    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1215        use std::collections::HashSet;
1216
1217        let source = view.source();
1218        let visible_cols = view.visible_column_indices();
1219        let visible_rows = view.visible_row_indices();
1220
1221        // Build a set to track unique rows
1222        let mut seen_rows = HashSet::new();
1223        let mut unique_row_indices = Vec::new();
1224
1225        for &row_idx in visible_rows {
1226            // Build a key representing this row's visible column values
1227            let mut row_key = Vec::new();
1228            for &col_idx in visible_cols {
1229                let value = source
1230                    .get_value(row_idx, col_idx)
1231                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1232                // Convert value to a hashable representation
1233                row_key.push(format!("{:?}", value));
1234            }
1235
1236            // Check if we've seen this row before
1237            if seen_rows.insert(row_key) {
1238                // First time seeing this row combination
1239                unique_row_indices.push(row_idx);
1240            }
1241        }
1242
1243        // Create a new view with only unique rows
1244        Ok(view.with_rows(unique_row_indices))
1245    }
1246
1247    /// Apply multi-column ORDER BY sorting to the view
1248    fn apply_multi_order_by(
1249        &self,
1250        mut view: DataView,
1251        order_by_columns: &[OrderByColumn],
1252    ) -> Result<DataView> {
1253        // Build list of (source_column_index, ascending) tuples
1254        let mut sort_columns = Vec::new();
1255
1256        for order_col in order_by_columns {
1257            // Try to find the column index
1258            // Check in the current view's source table (this handles both regular columns and computed columns)
1259            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1260            let col_index = view
1261                .source()
1262                .get_column_index(&order_col.column)
1263                .ok_or_else(|| {
1264                    // If not found, provide helpful error with suggestions
1265                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1266                    match suggestion {
1267                        Some(similar) => anyhow::anyhow!(
1268                            "Column '{}' not found. Did you mean '{}'?",
1269                            order_col.column,
1270                            similar
1271                        ),
1272                        None => {
1273                            // Also list available columns for debugging
1274                            let available_cols = view.source().column_names().join(", ");
1275                            anyhow::anyhow!(
1276                                "Column '{}' not found. Available columns: {}",
1277                                order_col.column,
1278                                available_cols
1279                            )
1280                        }
1281                    }
1282                })?;
1283
1284            let ascending = matches!(order_col.direction, SortDirection::Asc);
1285            sort_columns.push((col_index, ascending));
1286        }
1287
1288        // Apply multi-column sorting
1289        view.apply_multi_sort(&sort_columns)?;
1290        Ok(view)
1291    }
1292
1293    /// Apply GROUP BY to the view with optional HAVING clause
1294    fn apply_group_by(
1295        &self,
1296        view: DataView,
1297        group_by_exprs: &[SqlExpression],
1298        select_items: &[SelectItem],
1299        having: Option<&SqlExpression>,
1300    ) -> Result<DataView> {
1301        // Use the new expression-based GROUP BY implementation
1302        self.apply_group_by_expressions(
1303            view,
1304            group_by_exprs,
1305            select_items,
1306            having,
1307            self.case_insensitive,
1308            self.date_notation.clone(),
1309        )
1310    }
1311
1312    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1313    /// This helps pre-size hash tables for better performance
1314    pub fn estimate_group_cardinality(
1315        &self,
1316        view: &DataView,
1317        group_by_exprs: &[SqlExpression],
1318    ) -> usize {
1319        // If we have few rows, just return the row count as upper bound
1320        let row_count = view.get_visible_rows().len();
1321        if row_count <= 100 {
1322            return row_count;
1323        }
1324
1325        // Sample first 1000 rows or 10% of data, whichever is smaller
1326        let sample_size = min(1000, row_count / 10).max(100);
1327        let mut seen = FxHashSet::default();
1328
1329        let visible_rows = view.get_visible_rows();
1330        for (i, &row_idx) in visible_rows.iter().enumerate() {
1331            if i >= sample_size {
1332                break;
1333            }
1334
1335            // Evaluate GROUP BY expressions for this row
1336            let mut key_values = Vec::new();
1337            for expr in group_by_exprs {
1338                let mut evaluator = ArithmeticEvaluator::new(view.source());
1339                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1340                key_values.push(value);
1341            }
1342
1343            seen.insert(key_values);
1344        }
1345
1346        // Estimate total cardinality based on sample
1347        let sample_cardinality = seen.len();
1348        let estimated = (sample_cardinality * row_count) / sample_size;
1349
1350        // Cap at row count and ensure minimum of sample cardinality
1351        estimated.min(row_count).max(sample_cardinality)
1352    }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use super::*;
1358    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1359
1360    fn create_test_table() -> Arc<DataTable> {
1361        let mut table = DataTable::new("test");
1362
1363        // Add columns
1364        table.add_column(DataColumn::new("id"));
1365        table.add_column(DataColumn::new("name"));
1366        table.add_column(DataColumn::new("age"));
1367
1368        // Add rows
1369        table
1370            .add_row(DataRow::new(vec![
1371                DataValue::Integer(1),
1372                DataValue::String("Alice".to_string()),
1373                DataValue::Integer(30),
1374            ]))
1375            .unwrap();
1376
1377        table
1378            .add_row(DataRow::new(vec![
1379                DataValue::Integer(2),
1380                DataValue::String("Bob".to_string()),
1381                DataValue::Integer(25),
1382            ]))
1383            .unwrap();
1384
1385        table
1386            .add_row(DataRow::new(vec![
1387                DataValue::Integer(3),
1388                DataValue::String("Charlie".to_string()),
1389                DataValue::Integer(35),
1390            ]))
1391            .unwrap();
1392
1393        Arc::new(table)
1394    }
1395
1396    #[test]
1397    fn test_select_all() {
1398        let table = create_test_table();
1399        let engine = QueryEngine::new();
1400
1401        let view = engine
1402            .execute(table.clone(), "SELECT * FROM users")
1403            .unwrap();
1404        assert_eq!(view.row_count(), 3);
1405        assert_eq!(view.column_count(), 3);
1406    }
1407
1408    #[test]
1409    fn test_select_columns() {
1410        let table = create_test_table();
1411        let engine = QueryEngine::new();
1412
1413        let view = engine
1414            .execute(table.clone(), "SELECT name, age FROM users")
1415            .unwrap();
1416        assert_eq!(view.row_count(), 3);
1417        assert_eq!(view.column_count(), 2);
1418    }
1419
1420    #[test]
1421    fn test_select_with_limit() {
1422        let table = create_test_table();
1423        let engine = QueryEngine::new();
1424
1425        let view = engine
1426            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1427            .unwrap();
1428        assert_eq!(view.row_count(), 2);
1429    }
1430
1431    #[test]
1432    fn test_type_coercion_contains() {
1433        // Initialize tracing for debug output
1434        let _ = tracing_subscriber::fmt()
1435            .with_max_level(tracing::Level::DEBUG)
1436            .try_init();
1437
1438        let mut table = DataTable::new("test");
1439        table.add_column(DataColumn::new("id"));
1440        table.add_column(DataColumn::new("status"));
1441        table.add_column(DataColumn::new("price"));
1442
1443        // Add test data with mixed types
1444        table
1445            .add_row(DataRow::new(vec![
1446                DataValue::Integer(1),
1447                DataValue::String("Pending".to_string()),
1448                DataValue::Float(99.99),
1449            ]))
1450            .unwrap();
1451
1452        table
1453            .add_row(DataRow::new(vec![
1454                DataValue::Integer(2),
1455                DataValue::String("Confirmed".to_string()),
1456                DataValue::Float(150.50),
1457            ]))
1458            .unwrap();
1459
1460        table
1461            .add_row(DataRow::new(vec![
1462                DataValue::Integer(3),
1463                DataValue::String("Pending".to_string()),
1464                DataValue::Float(75.00),
1465            ]))
1466            .unwrap();
1467
1468        let table = Arc::new(table);
1469        let engine = QueryEngine::new();
1470
1471        println!("\n=== Testing WHERE clause with Contains ===");
1472        println!("Table has {} rows", table.row_count());
1473        for i in 0..table.row_count() {
1474            let status = table.get_value(i, 1);
1475            println!("Row {i}: status = {status:?}");
1476        }
1477
1478        // Test 1: Basic string contains (should work)
1479        println!("\n--- Test 1: status.Contains('pend') ---");
1480        let result = engine.execute(
1481            table.clone(),
1482            "SELECT * FROM test WHERE status.Contains('pend')",
1483        );
1484        match result {
1485            Ok(view) => {
1486                println!("SUCCESS: Found {} matching rows", view.row_count());
1487                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1488            }
1489            Err(e) => {
1490                panic!("Query failed: {e}");
1491            }
1492        }
1493
1494        // Test 2: Numeric contains (should work with type coercion)
1495        println!("\n--- Test 2: price.Contains('9') ---");
1496        let result = engine.execute(
1497            table.clone(),
1498            "SELECT * FROM test WHERE price.Contains('9')",
1499        );
1500        match result {
1501            Ok(view) => {
1502                println!(
1503                    "SUCCESS: Found {} matching rows with price containing '9'",
1504                    view.row_count()
1505                );
1506                // Should find 99.99 row
1507                assert!(view.row_count() >= 1);
1508            }
1509            Err(e) => {
1510                panic!("Numeric coercion query failed: {e}");
1511            }
1512        }
1513
1514        println!("\n=== All tests passed! ===");
1515    }
1516
1517    #[test]
1518    fn test_not_in_clause() {
1519        // Initialize tracing for debug output
1520        let _ = tracing_subscriber::fmt()
1521            .with_max_level(tracing::Level::DEBUG)
1522            .try_init();
1523
1524        let mut table = DataTable::new("test");
1525        table.add_column(DataColumn::new("id"));
1526        table.add_column(DataColumn::new("country"));
1527
1528        // Add test data
1529        table
1530            .add_row(DataRow::new(vec![
1531                DataValue::Integer(1),
1532                DataValue::String("CA".to_string()),
1533            ]))
1534            .unwrap();
1535
1536        table
1537            .add_row(DataRow::new(vec![
1538                DataValue::Integer(2),
1539                DataValue::String("US".to_string()),
1540            ]))
1541            .unwrap();
1542
1543        table
1544            .add_row(DataRow::new(vec![
1545                DataValue::Integer(3),
1546                DataValue::String("UK".to_string()),
1547            ]))
1548            .unwrap();
1549
1550        let table = Arc::new(table);
1551        let engine = QueryEngine::new();
1552
1553        println!("\n=== Testing NOT IN clause ===");
1554        println!("Table has {} rows", table.row_count());
1555        for i in 0..table.row_count() {
1556            let country = table.get_value(i, 1);
1557            println!("Row {i}: country = {country:?}");
1558        }
1559
1560        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1561        println!("\n--- Test: country NOT IN ('CA') ---");
1562        let result = engine.execute(
1563            table.clone(),
1564            "SELECT * FROM test WHERE country NOT IN ('CA')",
1565        );
1566        match result {
1567            Ok(view) => {
1568                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1569                assert_eq!(view.row_count(), 2); // Should find US and UK
1570            }
1571            Err(e) => {
1572                panic!("NOT IN query failed: {e}");
1573            }
1574        }
1575
1576        println!("\n=== NOT IN test complete! ===");
1577    }
1578
1579    #[test]
1580    fn test_case_insensitive_in_and_not_in() {
1581        // Initialize tracing for debug output
1582        let _ = tracing_subscriber::fmt()
1583            .with_max_level(tracing::Level::DEBUG)
1584            .try_init();
1585
1586        let mut table = DataTable::new("test");
1587        table.add_column(DataColumn::new("id"));
1588        table.add_column(DataColumn::new("country"));
1589
1590        // Add test data with mixed case
1591        table
1592            .add_row(DataRow::new(vec![
1593                DataValue::Integer(1),
1594                DataValue::String("CA".to_string()), // uppercase
1595            ]))
1596            .unwrap();
1597
1598        table
1599            .add_row(DataRow::new(vec![
1600                DataValue::Integer(2),
1601                DataValue::String("us".to_string()), // lowercase
1602            ]))
1603            .unwrap();
1604
1605        table
1606            .add_row(DataRow::new(vec![
1607                DataValue::Integer(3),
1608                DataValue::String("UK".to_string()), // uppercase
1609            ]))
1610            .unwrap();
1611
1612        let table = Arc::new(table);
1613
1614        println!("\n=== Testing Case-Insensitive IN clause ===");
1615        println!("Table has {} rows", table.row_count());
1616        for i in 0..table.row_count() {
1617            let country = table.get_value(i, 1);
1618            println!("Row {i}: country = {country:?}");
1619        }
1620
1621        // Test case-insensitive IN - should match 'CA' with 'ca'
1622        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1623        let engine = QueryEngine::with_case_insensitive(true);
1624        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1625        match result {
1626            Ok(view) => {
1627                println!(
1628                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1629                    view.row_count()
1630                );
1631                assert_eq!(view.row_count(), 1); // Should find CA row
1632            }
1633            Err(e) => {
1634                panic!("Case-insensitive IN query failed: {e}");
1635            }
1636        }
1637
1638        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
1639        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1640        let result = engine.execute(
1641            table.clone(),
1642            "SELECT * FROM test WHERE country NOT IN ('ca')",
1643        );
1644        match result {
1645            Ok(view) => {
1646                println!(
1647                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1648                    view.row_count()
1649                );
1650                assert_eq!(view.row_count(), 2); // Should find us and UK rows
1651            }
1652            Err(e) => {
1653                panic!("Case-insensitive NOT IN query failed: {e}");
1654            }
1655        }
1656
1657        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
1658        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1659        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
1660        let result = engine_case_sensitive
1661            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1662        match result {
1663            Ok(view) => {
1664                println!(
1665                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1666                    view.row_count()
1667                );
1668                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
1669            }
1670            Err(e) => {
1671                panic!("Case-sensitive IN query failed: {e}");
1672            }
1673        }
1674
1675        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1676    }
1677
1678    #[test]
1679    #[ignore = "Parentheses in WHERE clause not yet implemented"]
1680    fn test_parentheses_in_where_clause() {
1681        // Initialize tracing for debug output
1682        let _ = tracing_subscriber::fmt()
1683            .with_max_level(tracing::Level::DEBUG)
1684            .try_init();
1685
1686        let mut table = DataTable::new("test");
1687        table.add_column(DataColumn::new("id"));
1688        table.add_column(DataColumn::new("status"));
1689        table.add_column(DataColumn::new("priority"));
1690
1691        // Add test data
1692        table
1693            .add_row(DataRow::new(vec![
1694                DataValue::Integer(1),
1695                DataValue::String("Pending".to_string()),
1696                DataValue::String("High".to_string()),
1697            ]))
1698            .unwrap();
1699
1700        table
1701            .add_row(DataRow::new(vec![
1702                DataValue::Integer(2),
1703                DataValue::String("Complete".to_string()),
1704                DataValue::String("High".to_string()),
1705            ]))
1706            .unwrap();
1707
1708        table
1709            .add_row(DataRow::new(vec![
1710                DataValue::Integer(3),
1711                DataValue::String("Pending".to_string()),
1712                DataValue::String("Low".to_string()),
1713            ]))
1714            .unwrap();
1715
1716        table
1717            .add_row(DataRow::new(vec![
1718                DataValue::Integer(4),
1719                DataValue::String("Complete".to_string()),
1720                DataValue::String("Low".to_string()),
1721            ]))
1722            .unwrap();
1723
1724        let table = Arc::new(table);
1725        let engine = QueryEngine::new();
1726
1727        println!("\n=== Testing Parentheses in WHERE clause ===");
1728        println!("Table has {} rows", table.row_count());
1729        for i in 0..table.row_count() {
1730            let status = table.get_value(i, 1);
1731            let priority = table.get_value(i, 2);
1732            println!("Row {i}: status = {status:?}, priority = {priority:?}");
1733        }
1734
1735        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
1736        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1737        let result = engine.execute(
1738            table.clone(),
1739            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1740        );
1741        match result {
1742            Ok(view) => {
1743                println!(
1744                    "SUCCESS: Found {} rows with parenthetical logic",
1745                    view.row_count()
1746                );
1747                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
1748            }
1749            Err(e) => {
1750                panic!("Parentheses query failed: {e}");
1751            }
1752        }
1753
1754        println!("\n=== Parentheses test complete! ===");
1755    }
1756
1757    #[test]
1758    #[ignore = "Numeric type coercion needs fixing"]
1759    fn test_numeric_type_coercion() {
1760        // Initialize tracing for debug output
1761        let _ = tracing_subscriber::fmt()
1762            .with_max_level(tracing::Level::DEBUG)
1763            .try_init();
1764
1765        let mut table = DataTable::new("test");
1766        table.add_column(DataColumn::new("id"));
1767        table.add_column(DataColumn::new("price"));
1768        table.add_column(DataColumn::new("quantity"));
1769
1770        // Add test data with different numeric types
1771        table
1772            .add_row(DataRow::new(vec![
1773                DataValue::Integer(1),
1774                DataValue::Float(99.50), // Contains '.'
1775                DataValue::Integer(100),
1776            ]))
1777            .unwrap();
1778
1779        table
1780            .add_row(DataRow::new(vec![
1781                DataValue::Integer(2),
1782                DataValue::Float(150.0), // Contains '.' and '0'
1783                DataValue::Integer(200),
1784            ]))
1785            .unwrap();
1786
1787        table
1788            .add_row(DataRow::new(vec![
1789                DataValue::Integer(3),
1790                DataValue::Integer(75), // No decimal point
1791                DataValue::Integer(50),
1792            ]))
1793            .unwrap();
1794
1795        let table = Arc::new(table);
1796        let engine = QueryEngine::new();
1797
1798        println!("\n=== Testing Numeric Type Coercion ===");
1799        println!("Table has {} rows", table.row_count());
1800        for i in 0..table.row_count() {
1801            let price = table.get_value(i, 1);
1802            let quantity = table.get_value(i, 2);
1803            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1804        }
1805
1806        // Test Contains on float values - should find rows with decimal points
1807        println!("\n--- Test: price.Contains('.') ---");
1808        let result = engine.execute(
1809            table.clone(),
1810            "SELECT * FROM test WHERE price.Contains('.')",
1811        );
1812        match result {
1813            Ok(view) => {
1814                println!(
1815                    "SUCCESS: Found {} rows with decimal points in price",
1816                    view.row_count()
1817                );
1818                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
1819            }
1820            Err(e) => {
1821                panic!("Numeric Contains query failed: {e}");
1822            }
1823        }
1824
1825        // Test Contains on integer values converted to string
1826        println!("\n--- Test: quantity.Contains('0') ---");
1827        let result = engine.execute(
1828            table.clone(),
1829            "SELECT * FROM test WHERE quantity.Contains('0')",
1830        );
1831        match result {
1832            Ok(view) => {
1833                println!(
1834                    "SUCCESS: Found {} rows with '0' in quantity",
1835                    view.row_count()
1836                );
1837                assert_eq!(view.row_count(), 2); // Should find 100 and 200
1838            }
1839            Err(e) => {
1840                panic!("Integer Contains query failed: {e}");
1841            }
1842        }
1843
1844        println!("\n=== Numeric type coercion test complete! ===");
1845    }
1846
1847    #[test]
1848    fn test_datetime_comparisons() {
1849        // Initialize tracing for debug output
1850        let _ = tracing_subscriber::fmt()
1851            .with_max_level(tracing::Level::DEBUG)
1852            .try_init();
1853
1854        let mut table = DataTable::new("test");
1855        table.add_column(DataColumn::new("id"));
1856        table.add_column(DataColumn::new("created_date"));
1857
1858        // Add test data with date strings (as they would come from CSV)
1859        table
1860            .add_row(DataRow::new(vec![
1861                DataValue::Integer(1),
1862                DataValue::String("2024-12-15".to_string()),
1863            ]))
1864            .unwrap();
1865
1866        table
1867            .add_row(DataRow::new(vec![
1868                DataValue::Integer(2),
1869                DataValue::String("2025-01-15".to_string()),
1870            ]))
1871            .unwrap();
1872
1873        table
1874            .add_row(DataRow::new(vec![
1875                DataValue::Integer(3),
1876                DataValue::String("2025-02-15".to_string()),
1877            ]))
1878            .unwrap();
1879
1880        let table = Arc::new(table);
1881        let engine = QueryEngine::new();
1882
1883        println!("\n=== Testing DateTime Comparisons ===");
1884        println!("Table has {} rows", table.row_count());
1885        for i in 0..table.row_count() {
1886            let date = table.get_value(i, 1);
1887            println!("Row {i}: created_date = {date:?}");
1888        }
1889
1890        // Test DateTime constructor comparison - should find dates after 2025-01-01
1891        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
1892        let result = engine.execute(
1893            table.clone(),
1894            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
1895        );
1896        match result {
1897            Ok(view) => {
1898                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
1899                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
1900            }
1901            Err(e) => {
1902                panic!("DateTime comparison query failed: {e}");
1903            }
1904        }
1905
1906        println!("\n=== DateTime comparison test complete! ===");
1907    }
1908
1909    #[test]
1910    fn test_not_with_method_calls() {
1911        // Initialize tracing for debug output
1912        let _ = tracing_subscriber::fmt()
1913            .with_max_level(tracing::Level::DEBUG)
1914            .try_init();
1915
1916        let mut table = DataTable::new("test");
1917        table.add_column(DataColumn::new("id"));
1918        table.add_column(DataColumn::new("status"));
1919
1920        // Add test data
1921        table
1922            .add_row(DataRow::new(vec![
1923                DataValue::Integer(1),
1924                DataValue::String("Pending Review".to_string()),
1925            ]))
1926            .unwrap();
1927
1928        table
1929            .add_row(DataRow::new(vec![
1930                DataValue::Integer(2),
1931                DataValue::String("Complete".to_string()),
1932            ]))
1933            .unwrap();
1934
1935        table
1936            .add_row(DataRow::new(vec![
1937                DataValue::Integer(3),
1938                DataValue::String("Pending Approval".to_string()),
1939            ]))
1940            .unwrap();
1941
1942        let table = Arc::new(table);
1943        let engine = QueryEngine::with_case_insensitive(true);
1944
1945        println!("\n=== Testing NOT with Method Calls ===");
1946        println!("Table has {} rows", table.row_count());
1947        for i in 0..table.row_count() {
1948            let status = table.get_value(i, 1);
1949            println!("Row {i}: status = {status:?}");
1950        }
1951
1952        // Test NOT with Contains - should exclude rows containing "pend"
1953        println!("\n--- Test: NOT status.Contains('pend') ---");
1954        let result = engine.execute(
1955            table.clone(),
1956            "SELECT * FROM test WHERE NOT status.Contains('pend')",
1957        );
1958        match result {
1959            Ok(view) => {
1960                println!(
1961                    "SUCCESS: Found {} rows NOT containing 'pend'",
1962                    view.row_count()
1963                );
1964                assert_eq!(view.row_count(), 1); // Should find only "Complete"
1965            }
1966            Err(e) => {
1967                panic!("NOT Contains query failed: {e}");
1968            }
1969        }
1970
1971        // Test NOT with StartsWith
1972        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
1973        let result = engine.execute(
1974            table.clone(),
1975            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
1976        );
1977        match result {
1978            Ok(view) => {
1979                println!(
1980                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
1981                    view.row_count()
1982                );
1983                assert_eq!(view.row_count(), 1); // Should find only "Complete"
1984            }
1985            Err(e) => {
1986                panic!("NOT StartsWith query failed: {e}");
1987            }
1988        }
1989
1990        println!("\n=== NOT with method calls test complete! ===");
1991    }
1992
1993    #[test]
1994    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
1995    fn test_complex_logical_expressions() {
1996        // Initialize tracing for debug output
1997        let _ = tracing_subscriber::fmt()
1998            .with_max_level(tracing::Level::DEBUG)
1999            .try_init();
2000
2001        let mut table = DataTable::new("test");
2002        table.add_column(DataColumn::new("id"));
2003        table.add_column(DataColumn::new("status"));
2004        table.add_column(DataColumn::new("priority"));
2005        table.add_column(DataColumn::new("assigned"));
2006
2007        // Add comprehensive test data
2008        table
2009            .add_row(DataRow::new(vec![
2010                DataValue::Integer(1),
2011                DataValue::String("Pending".to_string()),
2012                DataValue::String("High".to_string()),
2013                DataValue::String("John".to_string()),
2014            ]))
2015            .unwrap();
2016
2017        table
2018            .add_row(DataRow::new(vec![
2019                DataValue::Integer(2),
2020                DataValue::String("Complete".to_string()),
2021                DataValue::String("High".to_string()),
2022                DataValue::String("Jane".to_string()),
2023            ]))
2024            .unwrap();
2025
2026        table
2027            .add_row(DataRow::new(vec![
2028                DataValue::Integer(3),
2029                DataValue::String("Pending".to_string()),
2030                DataValue::String("Low".to_string()),
2031                DataValue::String("John".to_string()),
2032            ]))
2033            .unwrap();
2034
2035        table
2036            .add_row(DataRow::new(vec![
2037                DataValue::Integer(4),
2038                DataValue::String("In Progress".to_string()),
2039                DataValue::String("Medium".to_string()),
2040                DataValue::String("Jane".to_string()),
2041            ]))
2042            .unwrap();
2043
2044        let table = Arc::new(table);
2045        let engine = QueryEngine::new();
2046
2047        println!("\n=== Testing Complex Logical Expressions ===");
2048        println!("Table has {} rows", table.row_count());
2049        for i in 0..table.row_count() {
2050            let status = table.get_value(i, 1);
2051            let priority = table.get_value(i, 2);
2052            let assigned = table.get_value(i, 3);
2053            println!(
2054                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2055            );
2056        }
2057
2058        // Test complex AND/OR logic
2059        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2060        let result = engine.execute(
2061            table.clone(),
2062            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2063        );
2064        match result {
2065            Ok(view) => {
2066                println!(
2067                    "SUCCESS: Found {} rows with complex logic",
2068                    view.row_count()
2069                );
2070                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2071            }
2072            Err(e) => {
2073                panic!("Complex logic query failed: {e}");
2074            }
2075        }
2076
2077        // Test NOT with complex expressions
2078        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2079        let result = engine.execute(
2080            table.clone(),
2081            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2082        );
2083        match result {
2084            Ok(view) => {
2085                println!(
2086                    "SUCCESS: Found {} rows with NOT complex logic",
2087                    view.row_count()
2088                );
2089                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2090            }
2091            Err(e) => {
2092                panic!("NOT complex logic query failed: {e}");
2093            }
2094        }
2095
2096        println!("\n=== Complex logical expressions test complete! ===");
2097    }
2098
2099    #[test]
2100    fn test_mixed_data_types_and_edge_cases() {
2101        // Initialize tracing for debug output
2102        let _ = tracing_subscriber::fmt()
2103            .with_max_level(tracing::Level::DEBUG)
2104            .try_init();
2105
2106        let mut table = DataTable::new("test");
2107        table.add_column(DataColumn::new("id"));
2108        table.add_column(DataColumn::new("value"));
2109        table.add_column(DataColumn::new("nullable_field"));
2110
2111        // Add test data with mixed types and edge cases
2112        table
2113            .add_row(DataRow::new(vec![
2114                DataValue::Integer(1),
2115                DataValue::String("123.45".to_string()),
2116                DataValue::String("present".to_string()),
2117            ]))
2118            .unwrap();
2119
2120        table
2121            .add_row(DataRow::new(vec![
2122                DataValue::Integer(2),
2123                DataValue::Float(678.90),
2124                DataValue::Null,
2125            ]))
2126            .unwrap();
2127
2128        table
2129            .add_row(DataRow::new(vec![
2130                DataValue::Integer(3),
2131                DataValue::Boolean(true),
2132                DataValue::String("also present".to_string()),
2133            ]))
2134            .unwrap();
2135
2136        table
2137            .add_row(DataRow::new(vec![
2138                DataValue::Integer(4),
2139                DataValue::String("false".to_string()),
2140                DataValue::Null,
2141            ]))
2142            .unwrap();
2143
2144        let table = Arc::new(table);
2145        let engine = QueryEngine::new();
2146
2147        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2148        println!("Table has {} rows", table.row_count());
2149        for i in 0..table.row_count() {
2150            let value = table.get_value(i, 1);
2151            let nullable = table.get_value(i, 2);
2152            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2153        }
2154
2155        // Test type coercion with boolean Contains
2156        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2157        let result = engine.execute(
2158            table.clone(),
2159            "SELECT * FROM test WHERE value.Contains('true')",
2160        );
2161        match result {
2162            Ok(view) => {
2163                println!(
2164                    "SUCCESS: Found {} rows with boolean coercion",
2165                    view.row_count()
2166                );
2167                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2168            }
2169            Err(e) => {
2170                panic!("Boolean coercion query failed: {e}");
2171            }
2172        }
2173
2174        // Test multiple IN values with mixed types
2175        println!("\n--- Test: id IN (1, 3) ---");
2176        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2177        match result {
2178            Ok(view) => {
2179                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2180                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2181            }
2182            Err(e) => {
2183                panic!("Multiple IN values query failed: {e}");
2184            }
2185        }
2186
2187        println!("\n=== Mixed data types test complete! ===");
2188    }
2189
2190    /// Test that aggregate-only queries return exactly one row (regression test)
2191    #[test]
2192    fn test_aggregate_only_single_row() {
2193        let table = create_test_stock_data();
2194        let engine = QueryEngine::new();
2195
2196        // Test query with multiple aggregates - should return exactly 1 row
2197        let result = engine
2198            .execute(
2199                table.clone(),
2200                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2201            )
2202            .expect("Query should succeed");
2203
2204        assert_eq!(
2205            result.row_count(),
2206            1,
2207            "Aggregate-only query should return exactly 1 row"
2208        );
2209        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2210
2211        // Verify the actual values are correct
2212        let source = result.source();
2213        let row = source.get_row(0).expect("Should have first row");
2214
2215        // COUNT(*) should be 5 (total rows)
2216        assert_eq!(row.values[0], DataValue::Integer(5));
2217
2218        // MIN should be 99.5
2219        assert_eq!(row.values[1], DataValue::Float(99.5));
2220
2221        // MAX should be 105.0
2222        assert_eq!(row.values[2], DataValue::Float(105.0));
2223
2224        // AVG should be approximately 102.4
2225        if let DataValue::Float(avg) = &row.values[3] {
2226            assert!(
2227                (avg - 102.4).abs() < 0.01,
2228                "Average should be approximately 102.4, got {}",
2229                avg
2230            );
2231        } else {
2232            panic!("AVG should return a Float value");
2233        }
2234    }
2235
2236    /// Test single aggregate function returns single row
2237    #[test]
2238    fn test_single_aggregate_single_row() {
2239        let table = create_test_stock_data();
2240        let engine = QueryEngine::new();
2241
2242        let result = engine
2243            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2244            .expect("Query should succeed");
2245
2246        assert_eq!(
2247            result.row_count(),
2248            1,
2249            "Single aggregate query should return exactly 1 row"
2250        );
2251        assert_eq!(result.column_count(), 1, "Should have 1 column");
2252
2253        let source = result.source();
2254        let row = source.get_row(0).expect("Should have first row");
2255        assert_eq!(row.values[0], DataValue::Integer(5));
2256    }
2257
2258    /// Test aggregate with WHERE clause filtering
2259    #[test]
2260    fn test_aggregate_with_where_single_row() {
2261        let table = create_test_stock_data();
2262        let engine = QueryEngine::new();
2263
2264        // Filter to only high-value stocks (>= 103.0) and aggregate
2265        let result = engine
2266            .execute(
2267                table.clone(),
2268                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2269            )
2270            .expect("Query should succeed");
2271
2272        assert_eq!(
2273            result.row_count(),
2274            1,
2275            "Filtered aggregate query should return exactly 1 row"
2276        );
2277        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2278
2279        let source = result.source();
2280        let row = source.get_row(0).expect("Should have first row");
2281
2282        // Should find 2 rows (103.5 and 105.0)
2283        assert_eq!(row.values[0], DataValue::Integer(2));
2284        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2285        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2286    }
2287
2288    #[test]
2289    fn test_not_in_parsing() {
2290        use crate::sql::recursive_parser::Parser;
2291
2292        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2293        println!("\n=== Testing NOT IN parsing ===");
2294        println!("Parsing query: {query}");
2295
2296        let mut parser = Parser::new(query);
2297        match parser.parse() {
2298            Ok(statement) => {
2299                println!("Parsed statement: {statement:#?}");
2300                if let Some(where_clause) = statement.where_clause {
2301                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2302                    if let Some(first_condition) = where_clause.conditions.first() {
2303                        println!("First condition expression: {:#?}", first_condition.expr);
2304                    }
2305                }
2306            }
2307            Err(e) => {
2308                panic!("Parse error: {e}");
2309            }
2310        }
2311    }
2312
2313    /// Create test stock data for aggregate testing
2314    fn create_test_stock_data() -> Arc<DataTable> {
2315        let mut table = DataTable::new("stock");
2316
2317        table.add_column(DataColumn::new("symbol"));
2318        table.add_column(DataColumn::new("close"));
2319        table.add_column(DataColumn::new("volume"));
2320
2321        // Add 5 rows of test data
2322        let test_data = vec![
2323            ("AAPL", 99.5, 1000),
2324            ("AAPL", 101.2, 1500),
2325            ("AAPL", 103.5, 2000),
2326            ("AAPL", 105.0, 1200),
2327            ("AAPL", 102.8, 1800),
2328        ];
2329
2330        for (symbol, close, volume) in test_data {
2331            table
2332                .add_row(DataRow::new(vec![
2333                    DataValue::String(symbol.to_string()),
2334                    DataValue::Float(close),
2335                    DataValue::Integer(volume),
2336                ]))
2337                .expect("Should add row successfully");
2338        }
2339
2340        Arc::new(table)
2341    }
2342}
2343
2344#[cfg(test)]
2345#[path = "query_engine_tests.rs"]
2346mod query_engine_tests;