sql_cli/data/
query_engine.rs

1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25    CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26    TableFunction,
27};
28
29/// Query engine that executes SQL directly on `DataTable`
30#[derive(Clone)]
31pub struct QueryEngine {
32    case_insensitive: bool,
33    date_notation: String,
34    behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl QueryEngine {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            case_insensitive: false,
48            date_notation: get_date_notation(),
49            behavior_config: None,
50        }
51    }
52
53    #[must_use]
54    pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55        let case_insensitive = config.case_insensitive_default;
56        // Use get_date_notation() to respect environment variable override
57        let date_notation = get_date_notation();
58        Self {
59            case_insensitive,
60            date_notation,
61            behavior_config: Some(config),
62        }
63    }
64
65    #[must_use]
66    pub fn with_date_notation(_date_notation: String) -> Self {
67        Self {
68            case_insensitive: false,
69            date_notation: get_date_notation(), // Always use the global function
70            behavior_config: None,
71        }
72    }
73
74    #[must_use]
75    pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76        Self {
77            case_insensitive,
78            date_notation: get_date_notation(),
79            behavior_config: None,
80        }
81    }
82
83    #[must_use]
84    pub fn with_case_insensitive_and_date_notation(
85        case_insensitive: bool,
86        _date_notation: String, // Keep parameter for compatibility but use get_date_notation()
87    ) -> Self {
88        Self {
89            case_insensitive,
90            date_notation: get_date_notation(), // Always use the global function
91            behavior_config: None,
92        }
93    }
94
95    /// Find a column name similar to the given name using edit distance
96    fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97        let columns = table.column_names();
98        let mut best_match: Option<(String, usize)> = None;
99
100        for col in columns {
101            let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102            // Only suggest if distance is small (likely a typo)
103            // Allow up to 3 edits for longer names
104            let max_distance = if name.len() > 10 { 3 } else { 2 };
105            if distance <= max_distance {
106                match &best_match {
107                    None => best_match = Some((col, distance)),
108                    Some((_, best_dist)) if distance < *best_dist => {
109                        best_match = Some((col, distance));
110                    }
111                    _ => {}
112                }
113            }
114        }
115
116        best_match.map(|(name, _)| name)
117    }
118
119    /// Calculate Levenshtein edit distance between two strings
120    fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121        let len1 = s1.len();
122        let len2 = s2.len();
123        let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125        for i in 0..=len1 {
126            matrix[i][0] = i;
127        }
128        for j in 0..=len2 {
129            matrix[0][j] = j;
130        }
131
132        for (i, c1) in s1.chars().enumerate() {
133            for (j, c2) in s2.chars().enumerate() {
134                let cost = usize::from(c1 != c2);
135                matrix[i + 1][j + 1] = std::cmp::min(
136                    matrix[i][j + 1] + 1, // deletion
137                    std::cmp::min(
138                        matrix[i + 1][j] + 1, // insertion
139                        matrix[i][j] + cost,  // substitution
140                    ),
141                );
142            }
143        }
144
145        matrix[len1][len2]
146    }
147
148    /// Execute a SQL query on a `DataTable` and return a `DataView` (for backward compatibility)
149    pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150        let (view, _plan) = self.execute_with_plan(table, sql)?;
151        Ok(view)
152    }
153
154    /// Execute a parsed SelectStatement on a `DataTable` and return a `DataView`
155    pub fn execute_statement(
156        &self,
157        table: Arc<DataTable>,
158        statement: SelectStatement,
159    ) -> Result<DataView> {
160        // First process CTEs to build context
161        let mut cte_context = HashMap::new();
162        for cte in &statement.ctes {
163            debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164            // Execute the CTE based on its type
165            let cte_result = match &cte.cte_type {
166                CTEType::Standard(query) => {
167                    // Execute the CTE query (it might reference earlier CTEs)
168                    let view = self.build_view_with_context(
169                        table.clone(),
170                        query.clone(),
171                        &mut cte_context,
172                    )?;
173
174                    // Materialize the view and enrich columns with qualified names
175                    let mut materialized = self.materialize_view(view)?;
176
177                    // Enrich columns with qualified names for proper scoping
178                    for column in materialized.columns_mut() {
179                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180                        column.source_table = Some(cte.name.clone());
181                    }
182
183                    DataView::new(Arc::new(materialized))
184                }
185                CTEType::Web(web_spec) => {
186                    // Fetch data from URL
187                    use crate::web::http_fetcher::WebDataFetcher;
188
189                    let fetcher = WebDataFetcher::new()?;
190                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192                    // Enrich columns with qualified names for proper scoping
193                    for column in data_table.columns_mut() {
194                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195                        column.source_table = Some(cte.name.clone());
196                    }
197
198                    // Convert DataTable to DataView
199                    DataView::new(Arc::new(data_table))
200                }
201            };
202            // Store the result in the context for later use
203            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204            debug!(
205                "QueryEngine: CTE '{}' pre-processed, stored in context",
206                cte.name
207            );
208        }
209
210        // Now process subqueries with CTE context available
211        let mut subquery_executor =
212            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215        // Build the view with the same CTE context
216        self.build_view_with_context(table, processed_statement, &mut cte_context)
217    }
218
219    /// Execute a statement with provided CTE context (for subqueries)
220    pub fn execute_statement_with_cte_context(
221        &self,
222        table: Arc<DataTable>,
223        statement: SelectStatement,
224        cte_context: &HashMap<String, Arc<DataView>>,
225    ) -> Result<DataView> {
226        // Clone the context so we can add any CTEs from this statement
227        let mut local_context = cte_context.clone();
228
229        // Process any CTEs in this statement (they might be nested)
230        for cte in &statement.ctes {
231            debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232            let cte_result = match &cte.cte_type {
233                CTEType::Standard(query) => {
234                    let view = self.build_view_with_context(
235                        table.clone(),
236                        query.clone(),
237                        &mut local_context,
238                    )?;
239
240                    // Materialize the view and enrich columns with qualified names
241                    let mut materialized = self.materialize_view(view)?;
242
243                    // Enrich columns with qualified names for proper scoping
244                    for column in materialized.columns_mut() {
245                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246                        column.source_table = Some(cte.name.clone());
247                    }
248
249                    DataView::new(Arc::new(materialized))
250                }
251                CTEType::Web(web_spec) => {
252                    // Fetch data from URL
253                    use crate::web::http_fetcher::WebDataFetcher;
254
255                    let fetcher = WebDataFetcher::new()?;
256                    let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258                    // Enrich columns with qualified names for proper scoping
259                    for column in data_table.columns_mut() {
260                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261                        column.source_table = Some(cte.name.clone());
262                    }
263
264                    // Convert DataTable to DataView
265                    DataView::new(Arc::new(data_table))
266                }
267            };
268            local_context.insert(cte.name.clone(), Arc::new(cte_result));
269        }
270
271        // Process subqueries with the complete context
272        let mut subquery_executor =
273            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276        // Build the view
277        self.build_view_with_context(table, processed_statement, &mut local_context)
278    }
279
280    /// Execute a query and return both the result and the execution plan
281    pub fn execute_with_plan(
282        &self,
283        table: Arc<DataTable>,
284        sql: &str,
285    ) -> Result<(DataView, ExecutionPlan)> {
286        let mut plan_builder = ExecutionPlanBuilder::new();
287        let start_time = Instant::now();
288
289        // Parse the SQL query
290        plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291        plan_builder.add_detail(format!("Query: {}", sql));
292        let mut parser = Parser::new(sql);
293        let statement = parser
294            .parse()
295            .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296        plan_builder.add_detail(format!("Parsed successfully"));
297        if let Some(from) = &statement.from_table {
298            plan_builder.add_detail(format!("FROM: {}", from));
299        }
300        if statement.where_clause.is_some() {
301            plan_builder.add_detail("WHERE clause present".to_string());
302        }
303        plan_builder.end_step();
304
305        // First process CTEs to build context
306        let mut cte_context = HashMap::new();
307
308        if !statement.ctes.is_empty() {
309            plan_builder.begin_step(
310                StepType::CTE,
311                format!("Process {} CTEs", statement.ctes.len()),
312            );
313
314            for cte in &statement.ctes {
315                let cte_start = Instant::now();
316                plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318                let cte_result = match &cte.cte_type {
319                    CTEType::Standard(query) => {
320                        // Add CTE query details
321                        if let Some(from) = &query.from_table {
322                            plan_builder.add_detail(format!("Source: {}", from));
323                        }
324                        if query.where_clause.is_some() {
325                            plan_builder.add_detail("Has WHERE clause".to_string());
326                        }
327                        if query.group_by.is_some() {
328                            plan_builder.add_detail("Has GROUP BY".to_string());
329                        }
330
331                        debug!(
332                            "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333                            cte.name,
334                            cte_context.keys().collect::<Vec<_>>()
335                        );
336
337                        // Process subqueries in the CTE's query FIRST
338                        // This allows the subqueries to see all previously defined CTEs
339                        let mut subquery_executor = SubqueryExecutor::with_cte_context(
340                            self.clone(),
341                            table.clone(),
342                            cte_context.clone(),
343                        );
344                        let processed_query = subquery_executor.execute_subqueries(query)?;
345
346                        let view = self.build_view_with_context(
347                            table.clone(),
348                            processed_query,
349                            &mut cte_context,
350                        )?;
351
352                        // Materialize the view and enrich columns with qualified names
353                        let mut materialized = self.materialize_view(view)?;
354
355                        // Enrich columns with qualified names for proper scoping
356                        for column in materialized.columns_mut() {
357                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358                            column.source_table = Some(cte.name.clone());
359                        }
360
361                        DataView::new(Arc::new(materialized))
362                    }
363                    CTEType::Web(web_spec) => {
364                        plan_builder.add_detail(format!("URL: {}", web_spec.url));
365                        if let Some(format) = &web_spec.format {
366                            plan_builder.add_detail(format!("Format: {:?}", format));
367                        }
368                        if let Some(cache) = web_spec.cache_seconds {
369                            plan_builder.add_detail(format!("Cache: {} seconds", cache));
370                        }
371
372                        // Fetch data from URL
373                        use crate::web::http_fetcher::WebDataFetcher;
374
375                        let fetcher = WebDataFetcher::new()?;
376                        let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378                        // Enrich columns with qualified names for proper scoping
379                        for column in data_table.columns_mut() {
380                            column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381                            column.source_table = Some(cte.name.clone());
382                        }
383
384                        // Convert DataTable to DataView
385                        DataView::new(Arc::new(data_table))
386                    }
387                };
388
389                // Record CTE statistics
390                plan_builder.set_rows_out(cte_result.row_count());
391                plan_builder.add_detail(format!(
392                    "Result: {} rows, {} columns",
393                    cte_result.row_count(),
394                    cte_result.column_count()
395                ));
396                plan_builder.add_detail(format!(
397                    "Execution time: {:.3}ms",
398                    cte_start.elapsed().as_secs_f64() * 1000.0
399                ));
400
401                debug!(
402                    "QueryEngine: Storing CTE '{}' in context with {} rows",
403                    cte.name,
404                    cte_result.row_count()
405                );
406                cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407                plan_builder.end_step();
408            }
409
410            plan_builder.add_detail(format!(
411                "All {} CTEs cached in context",
412                statement.ctes.len()
413            ));
414            plan_builder.end_step();
415        }
416
417        // Process subqueries in the statement with CTE context
418        plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419        let mut subquery_executor =
420            SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422        // Check if there are subqueries to process
423        let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424            // This is a simplified check - in reality we'd need to walk the AST
425            format!("{:?}", w).contains("Subquery")
426        });
427
428        if has_subqueries {
429            plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430        }
431
432        let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434        if has_subqueries {
435            plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436        } else {
437            plan_builder.add_detail("No subqueries to process".to_string());
438        }
439
440        plan_builder.end_step();
441        let result = self.build_view_with_context_and_plan(
442            table,
443            processed_statement,
444            &mut cte_context,
445            &mut plan_builder,
446        )?;
447
448        let total_duration = start_time.elapsed();
449        info!(
450            "Query execution complete: total={:?}, rows={}",
451            total_duration,
452            result.row_count()
453        );
454
455        let plan = plan_builder.build();
456        Ok((result, plan))
457    }
458
459    /// Build a `DataView` from a parsed SQL statement
460    fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461        let mut cte_context = HashMap::new();
462        self.build_view_with_context(table, statement, &mut cte_context)
463    }
464
465    /// Build a DataView from a SelectStatement with CTE context
466    fn build_view_with_context(
467        &self,
468        table: Arc<DataTable>,
469        statement: SelectStatement,
470        cte_context: &mut HashMap<String, Arc<DataView>>,
471    ) -> Result<DataView> {
472        let mut dummy_plan = ExecutionPlanBuilder::new();
473        self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474    }
475
476    /// Build a DataView from a SelectStatement with CTE context and execution plan tracking
477    fn build_view_with_context_and_plan(
478        &self,
479        table: Arc<DataTable>,
480        statement: SelectStatement,
481        cte_context: &mut HashMap<String, Arc<DataView>>,
482        plan: &mut ExecutionPlanBuilder,
483    ) -> Result<DataView> {
484        // First, process any CTEs that aren't already in the context
485        for cte in &statement.ctes {
486            // Skip if already processed (e.g., by execute_select for WEB CTEs)
487            if cte_context.contains_key(&cte.name) {
488                debug!(
489                    "QueryEngine: CTE '{}' already in context, skipping",
490                    cte.name
491                );
492                continue;
493            }
494
495            debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496            debug!(
497                "QueryEngine: Available CTEs for '{}': {:?}",
498                cte.name,
499                cte_context.keys().collect::<Vec<_>>()
500            );
501
502            // Execute the CTE query (it might reference earlier CTEs)
503            let cte_result = match &cte.cte_type {
504                CTEType::Standard(query) => {
505                    let view =
506                        self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508                    // Materialize the view and enrich columns with qualified names
509                    let mut materialized = self.materialize_view(view)?;
510
511                    // Enrich columns with qualified names for proper scoping
512                    for column in materialized.columns_mut() {
513                        column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514                        column.source_table = Some(cte.name.clone());
515                    }
516
517                    DataView::new(Arc::new(materialized))
518                }
519                CTEType::Web(_web_spec) => {
520                    // Web CTEs should have been processed earlier in execute_select
521                    return Err(anyhow!(
522                        "Web CTEs should be processed in execute_select method"
523                    ));
524                }
525            };
526
527            // Store the result in the context for later use
528            cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529            debug!(
530                "QueryEngine: CTE '{}' processed, stored in context",
531                cte.name
532            );
533        }
534
535        // Determine the source table for the main query
536        let source_table = if let Some(ref table_func) = statement.from_function {
537            // Handle table functions like RANGE()
538            debug!("QueryEngine: Processing table function...");
539            match table_func {
540                TableFunction::Generator { name, args } => {
541                    // Use the generator registry to create the table
542                    use crate::sql::generators::GeneratorRegistry;
543
544                    // Create generator registry (could be cached in QueryEngine)
545                    let registry = GeneratorRegistry::new();
546
547                    if let Some(generator) = registry.get(name) {
548                        // Evaluate arguments
549                        let mut evaluator = ArithmeticEvaluator::with_date_notation(
550                            &table,
551                            self.date_notation.clone(),
552                        );
553                        let dummy_row = 0;
554
555                        let mut evaluated_args = Vec::new();
556                        for arg in args {
557                            evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
558                        }
559
560                        // Generate the table
561                        generator.generate(evaluated_args)?
562                    } else {
563                        return Err(anyhow!("Unknown generator function: {}", name));
564                    }
565                }
566            }
567        } else if let Some(ref subquery) = statement.from_subquery {
568            // Execute the subquery and use its result as the source
569            debug!("QueryEngine: Processing FROM subquery...");
570            let subquery_result =
571                self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
572
573            // Convert the DataView to a DataTable for use as source
574            // This materializes the subquery result
575            let materialized = self.materialize_view(subquery_result)?;
576            Arc::new(materialized)
577        } else if let Some(ref table_name) = statement.from_table {
578            // Check if this references a CTE
579            if let Some(cte_view) = cte_context.get(table_name) {
580                debug!("QueryEngine: Using CTE '{}' as source table", table_name);
581                // Materialize the CTE view as a table
582                let materialized = self.materialize_view((**cte_view).clone())?;
583                Arc::new(materialized)
584            } else {
585                // Regular table reference - use the provided table
586                table.clone()
587            }
588        } else {
589            // No FROM clause - use the provided table
590            table.clone()
591        };
592
593        // Process JOINs if present
594        let final_table = if !statement.joins.is_empty() {
595            plan.begin_step(
596                StepType::Join,
597                format!("Process {} JOINs", statement.joins.len()),
598            );
599            plan.set_rows_in(source_table.row_count());
600
601            let join_executor = HashJoinExecutor::new(self.case_insensitive);
602            let mut current_table = source_table;
603
604            for (idx, join_clause) in statement.joins.iter().enumerate() {
605                let join_start = Instant::now();
606                plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
607                plan.add_detail(format!("Type: {:?}", join_clause.join_type));
608                plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
609                plan.add_detail(format!(
610                    "Executing {:?} JOIN on {}",
611                    join_clause.join_type, join_clause.condition.left_column
612                ));
613
614                // Resolve the right table for the join
615                let right_table = match &join_clause.table {
616                    TableSource::Table(name) => {
617                        // Check if it's a CTE reference
618                        if let Some(cte_view) = cte_context.get(name) {
619                            let materialized = self.materialize_view((**cte_view).clone())?;
620                            Arc::new(materialized)
621                        } else {
622                            // For now, we need the actual table data
623                            // In a real implementation, this would load from file
624                            return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
625                        }
626                    }
627                    TableSource::DerivedTable { query, alias: _ } => {
628                        // Execute the subquery
629                        let subquery_result = self.build_view_with_context(
630                            table.clone(),
631                            *query.clone(),
632                            cte_context,
633                        )?;
634                        let materialized = self.materialize_view(subquery_result)?;
635                        Arc::new(materialized)
636                    }
637                };
638
639                // Execute the join
640                let joined = join_executor.execute_join(
641                    current_table.clone(),
642                    join_clause,
643                    right_table.clone(),
644                )?;
645
646                plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
647                plan.set_rows_out(joined.row_count());
648                plan.add_detail(format!("Result: {} rows", joined.row_count()));
649                plan.add_detail(format!(
650                    "Join time: {:.3}ms",
651                    join_start.elapsed().as_secs_f64() * 1000.0
652                ));
653                plan.end_step();
654
655                current_table = Arc::new(joined);
656            }
657
658            plan.set_rows_out(current_table.row_count());
659            plan.add_detail(format!(
660                "Final result after all joins: {} rows",
661                current_table.row_count()
662            ));
663            plan.end_step();
664            current_table
665        } else {
666            source_table
667        };
668
669        // Continue with the existing build_view logic but using final_table
670        self.build_view_internal_with_plan(final_table, statement, plan)
671    }
672
673    /// Materialize a DataView into a new DataTable
674    fn materialize_view(&self, view: DataView) -> Result<DataTable> {
675        let source = view.source();
676        let mut result_table = DataTable::new("derived");
677
678        // Get the visible columns from the view
679        let visible_cols = view.visible_column_indices().to_vec();
680
681        // Copy column definitions
682        for col_idx in &visible_cols {
683            let col = &source.columns[*col_idx];
684            let new_col = DataColumn {
685                name: col.name.clone(),
686                data_type: col.data_type.clone(),
687                nullable: col.nullable,
688                unique_values: col.unique_values,
689                null_count: col.null_count,
690                metadata: col.metadata.clone(),
691                qualified_name: col.qualified_name.clone(), // Preserve qualified name
692                source_table: col.source_table.clone(),     // Preserve source table
693            };
694            result_table.add_column(new_col);
695        }
696
697        // Copy visible rows
698        for row_idx in view.visible_row_indices() {
699            let source_row = &source.rows[*row_idx];
700            let mut new_row = DataRow { values: Vec::new() };
701
702            for col_idx in &visible_cols {
703                new_row.values.push(source_row.values[*col_idx].clone());
704            }
705
706            result_table.add_row(new_row);
707        }
708
709        Ok(result_table)
710    }
711
712    fn build_view_internal(
713        &self,
714        table: Arc<DataTable>,
715        statement: SelectStatement,
716    ) -> Result<DataView> {
717        let mut dummy_plan = ExecutionPlanBuilder::new();
718        self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
719    }
720
721    fn build_view_internal_with_plan(
722        &self,
723        table: Arc<DataTable>,
724        statement: SelectStatement,
725        plan: &mut ExecutionPlanBuilder,
726    ) -> Result<DataView> {
727        debug!(
728            "QueryEngine::build_view - select_items: {:?}",
729            statement.select_items
730        );
731        debug!(
732            "QueryEngine::build_view - where_clause: {:?}",
733            statement.where_clause
734        );
735
736        // Start with all rows visible
737        let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
738
739        // Apply WHERE clause filtering using recursive evaluator
740        if let Some(where_clause) = &statement.where_clause {
741            let total_rows = table.row_count();
742            debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
743            debug!("QueryEngine: WHERE clause = {:?}", where_clause);
744
745            plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
746            plan.set_rows_in(total_rows);
747            plan.add_detail(format!("Input: {} rows", total_rows));
748
749            // Add details about WHERE conditions
750            for condition in &where_clause.conditions {
751                plan.add_detail(format!("Condition: {:?}", condition.expr));
752            }
753
754            let filter_start = Instant::now();
755            // Create an evaluation context for caching compiled regexes
756            let mut eval_context = EvaluationContext::new(self.case_insensitive);
757
758            // Filter visible rows based on WHERE clause
759            let mut filtered_rows = Vec::new();
760            for row_idx in visible_rows {
761                // Only log for first few rows to avoid performance impact
762                if row_idx < 3 {
763                    debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
764                }
765                let mut evaluator =
766                    RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
767                match evaluator.evaluate(where_clause, row_idx) {
768                    Ok(result) => {
769                        if row_idx < 3 {
770                            debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
771                        }
772                        if result {
773                            filtered_rows.push(row_idx);
774                        }
775                    }
776                    Err(e) => {
777                        if row_idx < 3 {
778                            debug!(
779                                "QueryEngine: WHERE evaluation error for row {}: {}",
780                                row_idx, e
781                            );
782                        }
783                        // Propagate WHERE clause errors instead of silently ignoring them
784                        return Err(e);
785                    }
786                }
787            }
788
789            // Log regex cache statistics
790            let (compilations, cache_hits) = eval_context.get_stats();
791            if compilations > 0 || cache_hits > 0 {
792                debug!(
793                    "LIKE pattern cache: {} compilations, {} cache hits",
794                    compilations, cache_hits
795                );
796            }
797            visible_rows = filtered_rows;
798            let filter_duration = filter_start.elapsed();
799            info!(
800                "WHERE clause filtering: {} rows -> {} rows in {:?}",
801                total_rows,
802                visible_rows.len(),
803                filter_duration
804            );
805
806            plan.set_rows_out(visible_rows.len());
807            plan.add_detail(format!("Output: {} rows", visible_rows.len()));
808            plan.add_detail(format!(
809                "Filter time: {:.3}ms",
810                filter_duration.as_secs_f64() * 1000.0
811            ));
812            plan.end_step();
813        }
814
815        // Create initial DataView with filtered rows
816        let mut view = DataView::new(table.clone());
817        view = view.with_rows(visible_rows);
818
819        // Handle GROUP BY if present
820        if let Some(group_by_exprs) = &statement.group_by {
821            if !group_by_exprs.is_empty() {
822                debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
823
824                plan.begin_step(
825                    StepType::GroupBy,
826                    format!("GROUP BY {} expressions", group_by_exprs.len()),
827                );
828                plan.set_rows_in(view.row_count());
829                plan.add_detail(format!("Input: {} rows", view.row_count()));
830                for expr in group_by_exprs {
831                    plan.add_detail(format!("Group by: {:?}", expr));
832                }
833
834                let group_start = Instant::now();
835                view = self.apply_group_by(
836                    view,
837                    group_by_exprs,
838                    &statement.select_items,
839                    statement.having.as_ref(),
840                )?;
841
842                plan.set_rows_out(view.row_count());
843                plan.add_detail(format!("Output: {} groups", view.row_count()));
844                plan.add_detail(format!(
845                    "Group time: {:.3}ms",
846                    group_start.elapsed().as_secs_f64() * 1000.0
847                ));
848                plan.end_step();
849            }
850        } else {
851            // Apply column projection or computed expressions (SELECT clause) - do this AFTER filtering
852            if !statement.select_items.is_empty() {
853                // Check if we have ANY non-star items (not just the first one)
854                let has_non_star_items = statement
855                    .select_items
856                    .iter()
857                    .any(|item| !matches!(item, SelectItem::Star));
858
859                // Apply select items if:
860                // 1. We have computed expressions or explicit columns
861                // 2. OR we have a mix of star and other items (e.g., SELECT *, computed_col)
862                if has_non_star_items || statement.select_items.len() > 1 {
863                    view = self.apply_select_items(view, &statement.select_items)?;
864                } else {
865                }
866                // If it's just a single star, no projection needed
867            } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
868                debug!("QueryEngine: Using legacy columns path");
869                // Fallback to legacy column projection for backward compatibility
870                // Use the current view's source table, not the original table
871                let source_table = view.source();
872                let column_indices =
873                    self.resolve_column_indices(source_table, &statement.columns)?;
874                view = view.with_columns(column_indices);
875            }
876        }
877
878        // Apply DISTINCT if specified
879        if statement.distinct {
880            plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
881            plan.set_rows_in(view.row_count());
882            plan.add_detail(format!("Input: {} rows", view.row_count()));
883
884            let distinct_start = Instant::now();
885            view = self.apply_distinct(view)?;
886
887            plan.set_rows_out(view.row_count());
888            plan.add_detail(format!("Output: {} unique rows", view.row_count()));
889            plan.add_detail(format!(
890                "Distinct time: {:.3}ms",
891                distinct_start.elapsed().as_secs_f64() * 1000.0
892            ));
893            plan.end_step();
894        }
895
896        // Apply ORDER BY sorting
897        if let Some(order_by_columns) = &statement.order_by {
898            if !order_by_columns.is_empty() {
899                plan.begin_step(
900                    StepType::Sort,
901                    format!("ORDER BY {} columns", order_by_columns.len()),
902                );
903                plan.set_rows_in(view.row_count());
904                for col in order_by_columns {
905                    plan.add_detail(format!("{} {:?}", col.column, col.direction));
906                }
907
908                let sort_start = Instant::now();
909                view = self.apply_multi_order_by(view, order_by_columns)?;
910
911                plan.add_detail(format!(
912                    "Sort time: {:.3}ms",
913                    sort_start.elapsed().as_secs_f64() * 1000.0
914                ));
915                plan.end_step();
916            }
917        }
918
919        // Apply LIMIT/OFFSET
920        if let Some(limit) = statement.limit {
921            let offset = statement.offset.unwrap_or(0);
922            plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
923            plan.set_rows_in(view.row_count());
924            if offset > 0 {
925                plan.add_detail(format!("OFFSET: {}", offset));
926            }
927            view = view.with_limit(limit, offset);
928            plan.set_rows_out(view.row_count());
929            plan.add_detail(format!("Output: {} rows", view.row_count()));
930            plan.end_step();
931        }
932
933        Ok(view)
934    }
935
936    /// Resolve column names to indices
937    fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
938        let mut indices = Vec::new();
939        let table_columns = table.column_names();
940
941        for col_name in columns {
942            let index = table_columns
943                .iter()
944                .position(|c| c.eq_ignore_ascii_case(col_name))
945                .ok_or_else(|| {
946                    let suggestion = self.find_similar_column(table, col_name);
947                    match suggestion {
948                        Some(similar) => anyhow::anyhow!(
949                            "Column '{}' not found. Did you mean '{}'?",
950                            col_name,
951                            similar
952                        ),
953                        None => anyhow::anyhow!("Column '{}' not found", col_name),
954                    }
955                })?;
956            indices.push(index);
957        }
958
959        Ok(indices)
960    }
961
962    /// Apply SELECT items (columns and computed expressions) to create new view
963    fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
964        debug!(
965            "QueryEngine::apply_select_items - items: {:?}",
966            select_items
967        );
968        debug!(
969            "QueryEngine::apply_select_items - input view has {} rows",
970            view.row_count()
971        );
972
973        // Check if this is an aggregate query:
974        // 1. At least one aggregate function exists
975        // 2. All other items are either aggregates or constants (aggregate-compatible)
976        let has_aggregates = select_items.iter().any(|item| match item {
977            SelectItem::Expression { expr, .. } => contains_aggregate(expr),
978            SelectItem::Column(_) => false,
979            SelectItem::Star => false,
980        });
981
982        let all_aggregate_compatible = select_items.iter().all(|item| match item {
983            SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
984            SelectItem::Column(_) => false, // Columns are not aggregate-compatible
985            SelectItem::Star => false,      // Star is not aggregate-compatible
986        });
987
988        if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
989            // Special handling for aggregate queries with constants (no GROUP BY)
990            // These should produce exactly one row
991            debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
992            return self.apply_aggregate_select(view, select_items);
993        }
994
995        // Check if we need to create computed columns
996        let has_computed_expressions = select_items
997            .iter()
998            .any(|item| matches!(item, SelectItem::Expression { .. }));
999
1000        debug!(
1001            "QueryEngine::apply_select_items - has_computed_expressions: {}",
1002            has_computed_expressions
1003        );
1004
1005        if !has_computed_expressions {
1006            // Simple case: only columns, use existing projection logic
1007            let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1008            return Ok(view.with_columns(column_indices));
1009        }
1010
1011        // Complex case: we have computed expressions
1012        // IMPORTANT: We create a PROJECTED view, not a new table
1013        // This preserves the original DataTable reference
1014
1015        let source_table = view.source();
1016        let visible_rows = view.visible_row_indices();
1017
1018        // Create a temporary table just for the computed result view
1019        // But this table is only used for the current query result
1020        let mut computed_table = DataTable::new("query_result");
1021
1022        // First, expand any Star selectors to actual columns
1023        let mut expanded_items = Vec::new();
1024        for item in select_items {
1025            match item {
1026                SelectItem::Star => {
1027                    // Expand * to all columns from source table
1028                    for col_name in source_table.column_names() {
1029                        expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1030                            col_name.to_string(),
1031                        )));
1032                    }
1033                }
1034                _ => expanded_items.push(item.clone()),
1035            }
1036        }
1037
1038        // Add columns based on expanded SelectItems, handling duplicates
1039        let mut column_name_counts: std::collections::HashMap<String, usize> =
1040            std::collections::HashMap::new();
1041
1042        for item in &expanded_items {
1043            let base_name = match item {
1044                SelectItem::Column(col_ref) => col_ref.name.clone(),
1045                SelectItem::Expression { alias, .. } => alias.clone(),
1046                SelectItem::Star => unreachable!("Star should have been expanded"),
1047            };
1048
1049            // Check if this column name has been used before
1050            let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1051            let column_name = if *count == 0 {
1052                // First occurrence, use the name as-is
1053                base_name.clone()
1054            } else {
1055                // Duplicate, append a suffix
1056                format!("{base_name}_{count}")
1057            };
1058            *count += 1;
1059
1060            computed_table.add_column(DataColumn::new(&column_name));
1061        }
1062
1063        // Calculate values for each row
1064        let mut evaluator =
1065            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1066
1067        for &row_idx in visible_rows {
1068            let mut row_values = Vec::new();
1069
1070            for item in &expanded_items {
1071                let value = match item {
1072                    SelectItem::Column(col_ref) => {
1073                        // Column reference with optional table prefix
1074                        let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1075                            // For qualified references, ONLY try qualified lookup - no fallback
1076                            let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1077                            let result = source_table.find_column_by_qualified_name(&qualified_name);
1078
1079                            // Debug: log what qualified names are available when lookup fails
1080                            if result.is_none() {
1081                                let available_qualified: Vec<String> = source_table.columns.iter()
1082                                    .filter_map(|c| c.qualified_name.clone())
1083                                    .collect();
1084                                let available_simple: Vec<String> = source_table.columns.iter()
1085                                    .map(|c| c.name.clone())
1086                                    .collect();
1087                                debug!(
1088                                    "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1089                                    qualified_name, available_qualified, available_simple
1090                                );
1091                                // This MUST return None to trigger error
1092                            }
1093                            result
1094                        } else {
1095                            // Simple column name lookup
1096                            source_table.get_column_index(&col_ref.name)
1097                        }
1098                        .ok_or_else(|| {
1099                            let display_name = if let Some(prefix) = &col_ref.table_prefix {
1100                                format!("{}.{}", prefix, col_ref.name)
1101                            } else {
1102                                col_ref.name.clone()
1103                            };
1104                            let suggestion = self.find_similar_column(source_table, &col_ref.name);
1105                            match suggestion {
1106                                Some(similar) => anyhow::anyhow!(
1107                                    "Column '{}' not found. Did you mean '{}'?",
1108                                    display_name,
1109                                    similar
1110                                ),
1111                                None => anyhow::anyhow!("Column '{}' not found", display_name),
1112                            }
1113                        })?;
1114                        let row = source_table
1115                            .get_row(row_idx)
1116                            .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1117                        row.get(col_idx)
1118                            .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1119                            .clone()
1120                    }
1121                    SelectItem::Expression { expr, .. } => {
1122                        // Computed expression
1123                        evaluator.evaluate(expr, row_idx)?
1124                    }
1125                    SelectItem::Star => unreachable!("Star should have been expanded"),
1126                };
1127                row_values.push(value);
1128            }
1129
1130            computed_table
1131                .add_row(DataRow::new(row_values))
1132                .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1133        }
1134
1135        // Return a view of the computed result
1136        // This is a temporary view for this query only
1137        Ok(DataView::new(Arc::new(computed_table)))
1138    }
1139
1140    /// Apply aggregate-only SELECT (no GROUP BY - produces single row)
1141    fn apply_aggregate_select(
1142        &self,
1143        view: DataView,
1144        select_items: &[SelectItem],
1145    ) -> Result<DataView> {
1146        debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1147
1148        let source_table = view.source();
1149        let mut result_table = DataTable::new("aggregate_result");
1150
1151        // Add columns for each select item
1152        for item in select_items {
1153            let column_name = match item {
1154                SelectItem::Expression { alias, .. } => alias.clone(),
1155                _ => unreachable!("Should only have expressions in aggregate-only query"),
1156            };
1157            result_table.add_column(DataColumn::new(&column_name));
1158        }
1159
1160        // Create evaluator with visible rows from the view (for filtered aggregates)
1161        let visible_rows = view.visible_row_indices().to_vec();
1162        let mut evaluator =
1163            ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1164                .with_visible_rows(visible_rows);
1165
1166        // Evaluate each aggregate expression once (they handle all rows internally)
1167        let mut row_values = Vec::new();
1168        for item in select_items {
1169            match item {
1170                SelectItem::Expression { expr, .. } => {
1171                    // The evaluator will handle aggregates over all rows
1172                    // We pass row_index=0 but aggregates ignore it and process all rows
1173                    let value = evaluator.evaluate(expr, 0)?;
1174                    row_values.push(value);
1175                }
1176                _ => unreachable!("Should only have expressions in aggregate-only query"),
1177            }
1178        }
1179
1180        // Add the single result row
1181        result_table
1182            .add_row(DataRow::new(row_values))
1183            .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1184
1185        Ok(DataView::new(Arc::new(result_table)))
1186    }
1187
1188    /// Resolve `SelectItem` columns to indices (for simple column projections only)
1189    fn resolve_select_columns(
1190        &self,
1191        table: &DataTable,
1192        select_items: &[SelectItem],
1193    ) -> Result<Vec<usize>> {
1194        let mut indices = Vec::new();
1195        let table_columns = table.column_names();
1196
1197        for item in select_items {
1198            match item {
1199                SelectItem::Column(col_ref) => {
1200                    // Check if this has a table prefix
1201                    let index = if let Some(table_prefix) = &col_ref.table_prefix {
1202                        // For qualified references, ONLY try qualified lookup - no fallback
1203                        let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1204                        table.find_column_by_qualified_name(&qualified_name)
1205                            .ok_or_else(|| {
1206                                // Check if any columns have qualified names for better error message
1207                                let has_qualified = table.columns.iter()
1208                                    .any(|c| c.qualified_name.is_some());
1209                                if !has_qualified {
1210                                    anyhow::anyhow!(
1211                                        "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1212                                        qualified_name, table_prefix
1213                                    )
1214                                } else {
1215                                    anyhow::anyhow!("Column '{}' not found", qualified_name)
1216                                }
1217                            })?
1218                    } else {
1219                        // Simple column name lookup
1220                        table_columns
1221                            .iter()
1222                            .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1223                            .ok_or_else(|| {
1224                                let suggestion = self.find_similar_column(table, &col_ref.name);
1225                                match suggestion {
1226                                    Some(similar) => anyhow::anyhow!(
1227                                        "Column '{}' not found. Did you mean '{}'?",
1228                                        col_ref.name,
1229                                        similar
1230                                    ),
1231                                    None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1232                                }
1233                            })?
1234                    };
1235                    indices.push(index);
1236                }
1237                SelectItem::Star => {
1238                    // Expand * to all column indices
1239                    for i in 0..table_columns.len() {
1240                        indices.push(i);
1241                    }
1242                }
1243                SelectItem::Expression { .. } => {
1244                    return Err(anyhow::anyhow!(
1245                        "Computed expressions require new table creation"
1246                    ));
1247                }
1248            }
1249        }
1250
1251        Ok(indices)
1252    }
1253
1254    /// Apply DISTINCT to remove duplicate rows
1255    fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1256        use std::collections::HashSet;
1257
1258        let source = view.source();
1259        let visible_cols = view.visible_column_indices();
1260        let visible_rows = view.visible_row_indices();
1261
1262        // Build a set to track unique rows
1263        let mut seen_rows = HashSet::new();
1264        let mut unique_row_indices = Vec::new();
1265
1266        for &row_idx in visible_rows {
1267            // Build a key representing this row's visible column values
1268            let mut row_key = Vec::new();
1269            for &col_idx in visible_cols {
1270                let value = source
1271                    .get_value(row_idx, col_idx)
1272                    .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1273                // Convert value to a hashable representation
1274                row_key.push(format!("{:?}", value));
1275            }
1276
1277            // Check if we've seen this row before
1278            if seen_rows.insert(row_key) {
1279                // First time seeing this row combination
1280                unique_row_indices.push(row_idx);
1281            }
1282        }
1283
1284        // Create a new view with only unique rows
1285        Ok(view.with_rows(unique_row_indices))
1286    }
1287
1288    /// Apply multi-column ORDER BY sorting to the view
1289    fn apply_multi_order_by(
1290        &self,
1291        mut view: DataView,
1292        order_by_columns: &[OrderByColumn],
1293    ) -> Result<DataView> {
1294        // Build list of (source_column_index, ascending) tuples
1295        let mut sort_columns = Vec::new();
1296
1297        for order_col in order_by_columns {
1298            // Try to find the column index
1299            // Check in the current view's source table (this handles both regular columns and computed columns)
1300            // This is especially important after GROUP BY where we have a new result table with aggregate aliases
1301            let col_index = view
1302                .source()
1303                .get_column_index(&order_col.column)
1304                .ok_or_else(|| {
1305                    // If not found, provide helpful error with suggestions
1306                    let suggestion = self.find_similar_column(view.source(), &order_col.column);
1307                    match suggestion {
1308                        Some(similar) => anyhow::anyhow!(
1309                            "Column '{}' not found. Did you mean '{}'?",
1310                            order_col.column,
1311                            similar
1312                        ),
1313                        None => {
1314                            // Also list available columns for debugging
1315                            let available_cols = view.source().column_names().join(", ");
1316                            anyhow::anyhow!(
1317                                "Column '{}' not found. Available columns: {}",
1318                                order_col.column,
1319                                available_cols
1320                            )
1321                        }
1322                    }
1323                })?;
1324
1325            let ascending = matches!(order_col.direction, SortDirection::Asc);
1326            sort_columns.push((col_index, ascending));
1327        }
1328
1329        // Apply multi-column sorting
1330        view.apply_multi_sort(&sort_columns)?;
1331        Ok(view)
1332    }
1333
1334    /// Apply GROUP BY to the view with optional HAVING clause
1335    fn apply_group_by(
1336        &self,
1337        view: DataView,
1338        group_by_exprs: &[SqlExpression],
1339        select_items: &[SelectItem],
1340        having: Option<&SqlExpression>,
1341    ) -> Result<DataView> {
1342        // Use the new expression-based GROUP BY implementation
1343        self.apply_group_by_expressions(
1344            view,
1345            group_by_exprs,
1346            select_items,
1347            having,
1348            self.case_insensitive,
1349            self.date_notation.clone(),
1350        )
1351    }
1352
1353    /// Estimate the cardinality (number of unique groups) for GROUP BY operations
1354    /// This helps pre-size hash tables for better performance
1355    pub fn estimate_group_cardinality(
1356        &self,
1357        view: &DataView,
1358        group_by_exprs: &[SqlExpression],
1359    ) -> usize {
1360        // If we have few rows, just return the row count as upper bound
1361        let row_count = view.get_visible_rows().len();
1362        if row_count <= 100 {
1363            return row_count;
1364        }
1365
1366        // Sample first 1000 rows or 10% of data, whichever is smaller
1367        let sample_size = min(1000, row_count / 10).max(100);
1368        let mut seen = FxHashSet::default();
1369
1370        let visible_rows = view.get_visible_rows();
1371        for (i, &row_idx) in visible_rows.iter().enumerate() {
1372            if i >= sample_size {
1373                break;
1374            }
1375
1376            // Evaluate GROUP BY expressions for this row
1377            let mut key_values = Vec::new();
1378            for expr in group_by_exprs {
1379                let mut evaluator = ArithmeticEvaluator::new(view.source());
1380                let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1381                key_values.push(value);
1382            }
1383
1384            seen.insert(key_values);
1385        }
1386
1387        // Estimate total cardinality based on sample
1388        let sample_cardinality = seen.len();
1389        let estimated = (sample_cardinality * row_count) / sample_size;
1390
1391        // Cap at row count and ensure minimum of sample cardinality
1392        estimated.min(row_count).max(sample_cardinality)
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399    use crate::data::datatable::{DataColumn, DataRow, DataValue};
1400
1401    fn create_test_table() -> Arc<DataTable> {
1402        let mut table = DataTable::new("test");
1403
1404        // Add columns
1405        table.add_column(DataColumn::new("id"));
1406        table.add_column(DataColumn::new("name"));
1407        table.add_column(DataColumn::new("age"));
1408
1409        // Add rows
1410        table
1411            .add_row(DataRow::new(vec![
1412                DataValue::Integer(1),
1413                DataValue::String("Alice".to_string()),
1414                DataValue::Integer(30),
1415            ]))
1416            .unwrap();
1417
1418        table
1419            .add_row(DataRow::new(vec![
1420                DataValue::Integer(2),
1421                DataValue::String("Bob".to_string()),
1422                DataValue::Integer(25),
1423            ]))
1424            .unwrap();
1425
1426        table
1427            .add_row(DataRow::new(vec![
1428                DataValue::Integer(3),
1429                DataValue::String("Charlie".to_string()),
1430                DataValue::Integer(35),
1431            ]))
1432            .unwrap();
1433
1434        Arc::new(table)
1435    }
1436
1437    #[test]
1438    fn test_select_all() {
1439        let table = create_test_table();
1440        let engine = QueryEngine::new();
1441
1442        let view = engine
1443            .execute(table.clone(), "SELECT * FROM users")
1444            .unwrap();
1445        assert_eq!(view.row_count(), 3);
1446        assert_eq!(view.column_count(), 3);
1447    }
1448
1449    #[test]
1450    fn test_select_columns() {
1451        let table = create_test_table();
1452        let engine = QueryEngine::new();
1453
1454        let view = engine
1455            .execute(table.clone(), "SELECT name, age FROM users")
1456            .unwrap();
1457        assert_eq!(view.row_count(), 3);
1458        assert_eq!(view.column_count(), 2);
1459    }
1460
1461    #[test]
1462    fn test_select_with_limit() {
1463        let table = create_test_table();
1464        let engine = QueryEngine::new();
1465
1466        let view = engine
1467            .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1468            .unwrap();
1469        assert_eq!(view.row_count(), 2);
1470    }
1471
1472    #[test]
1473    fn test_type_coercion_contains() {
1474        // Initialize tracing for debug output
1475        let _ = tracing_subscriber::fmt()
1476            .with_max_level(tracing::Level::DEBUG)
1477            .try_init();
1478
1479        let mut table = DataTable::new("test");
1480        table.add_column(DataColumn::new("id"));
1481        table.add_column(DataColumn::new("status"));
1482        table.add_column(DataColumn::new("price"));
1483
1484        // Add test data with mixed types
1485        table
1486            .add_row(DataRow::new(vec![
1487                DataValue::Integer(1),
1488                DataValue::String("Pending".to_string()),
1489                DataValue::Float(99.99),
1490            ]))
1491            .unwrap();
1492
1493        table
1494            .add_row(DataRow::new(vec![
1495                DataValue::Integer(2),
1496                DataValue::String("Confirmed".to_string()),
1497                DataValue::Float(150.50),
1498            ]))
1499            .unwrap();
1500
1501        table
1502            .add_row(DataRow::new(vec![
1503                DataValue::Integer(3),
1504                DataValue::String("Pending".to_string()),
1505                DataValue::Float(75.00),
1506            ]))
1507            .unwrap();
1508
1509        let table = Arc::new(table);
1510        let engine = QueryEngine::new();
1511
1512        println!("\n=== Testing WHERE clause with Contains ===");
1513        println!("Table has {} rows", table.row_count());
1514        for i in 0..table.row_count() {
1515            let status = table.get_value(i, 1);
1516            println!("Row {i}: status = {status:?}");
1517        }
1518
1519        // Test 1: Basic string contains (should work)
1520        println!("\n--- Test 1: status.Contains('pend') ---");
1521        let result = engine.execute(
1522            table.clone(),
1523            "SELECT * FROM test WHERE status.Contains('pend')",
1524        );
1525        match result {
1526            Ok(view) => {
1527                println!("SUCCESS: Found {} matching rows", view.row_count());
1528                assert_eq!(view.row_count(), 2); // Should find both Pending rows
1529            }
1530            Err(e) => {
1531                panic!("Query failed: {e}");
1532            }
1533        }
1534
1535        // Test 2: Numeric contains (should work with type coercion)
1536        println!("\n--- Test 2: price.Contains('9') ---");
1537        let result = engine.execute(
1538            table.clone(),
1539            "SELECT * FROM test WHERE price.Contains('9')",
1540        );
1541        match result {
1542            Ok(view) => {
1543                println!(
1544                    "SUCCESS: Found {} matching rows with price containing '9'",
1545                    view.row_count()
1546                );
1547                // Should find 99.99 row
1548                assert!(view.row_count() >= 1);
1549            }
1550            Err(e) => {
1551                panic!("Numeric coercion query failed: {e}");
1552            }
1553        }
1554
1555        println!("\n=== All tests passed! ===");
1556    }
1557
1558    #[test]
1559    fn test_not_in_clause() {
1560        // Initialize tracing for debug output
1561        let _ = tracing_subscriber::fmt()
1562            .with_max_level(tracing::Level::DEBUG)
1563            .try_init();
1564
1565        let mut table = DataTable::new("test");
1566        table.add_column(DataColumn::new("id"));
1567        table.add_column(DataColumn::new("country"));
1568
1569        // Add test data
1570        table
1571            .add_row(DataRow::new(vec![
1572                DataValue::Integer(1),
1573                DataValue::String("CA".to_string()),
1574            ]))
1575            .unwrap();
1576
1577        table
1578            .add_row(DataRow::new(vec![
1579                DataValue::Integer(2),
1580                DataValue::String("US".to_string()),
1581            ]))
1582            .unwrap();
1583
1584        table
1585            .add_row(DataRow::new(vec![
1586                DataValue::Integer(3),
1587                DataValue::String("UK".to_string()),
1588            ]))
1589            .unwrap();
1590
1591        let table = Arc::new(table);
1592        let engine = QueryEngine::new();
1593
1594        println!("\n=== Testing NOT IN clause ===");
1595        println!("Table has {} rows", table.row_count());
1596        for i in 0..table.row_count() {
1597            let country = table.get_value(i, 1);
1598            println!("Row {i}: country = {country:?}");
1599        }
1600
1601        // Test NOT IN clause - should exclude CA, return US and UK (2 rows)
1602        println!("\n--- Test: country NOT IN ('CA') ---");
1603        let result = engine.execute(
1604            table.clone(),
1605            "SELECT * FROM test WHERE country NOT IN ('CA')",
1606        );
1607        match result {
1608            Ok(view) => {
1609                println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1610                assert_eq!(view.row_count(), 2); // Should find US and UK
1611            }
1612            Err(e) => {
1613                panic!("NOT IN query failed: {e}");
1614            }
1615        }
1616
1617        println!("\n=== NOT IN test complete! ===");
1618    }
1619
1620    #[test]
1621    fn test_case_insensitive_in_and_not_in() {
1622        // Initialize tracing for debug output
1623        let _ = tracing_subscriber::fmt()
1624            .with_max_level(tracing::Level::DEBUG)
1625            .try_init();
1626
1627        let mut table = DataTable::new("test");
1628        table.add_column(DataColumn::new("id"));
1629        table.add_column(DataColumn::new("country"));
1630
1631        // Add test data with mixed case
1632        table
1633            .add_row(DataRow::new(vec![
1634                DataValue::Integer(1),
1635                DataValue::String("CA".to_string()), // uppercase
1636            ]))
1637            .unwrap();
1638
1639        table
1640            .add_row(DataRow::new(vec![
1641                DataValue::Integer(2),
1642                DataValue::String("us".to_string()), // lowercase
1643            ]))
1644            .unwrap();
1645
1646        table
1647            .add_row(DataRow::new(vec![
1648                DataValue::Integer(3),
1649                DataValue::String("UK".to_string()), // uppercase
1650            ]))
1651            .unwrap();
1652
1653        let table = Arc::new(table);
1654
1655        println!("\n=== Testing Case-Insensitive IN clause ===");
1656        println!("Table has {} rows", table.row_count());
1657        for i in 0..table.row_count() {
1658            let country = table.get_value(i, 1);
1659            println!("Row {i}: country = {country:?}");
1660        }
1661
1662        // Test case-insensitive IN - should match 'CA' with 'ca'
1663        println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1664        let engine = QueryEngine::with_case_insensitive(true);
1665        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1666        match result {
1667            Ok(view) => {
1668                println!(
1669                    "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1670                    view.row_count()
1671                );
1672                assert_eq!(view.row_count(), 1); // Should find CA row
1673            }
1674            Err(e) => {
1675                panic!("Case-insensitive IN query failed: {e}");
1676            }
1677        }
1678
1679        // Test case-insensitive NOT IN - should exclude 'CA' when searching for 'ca'
1680        println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1681        let result = engine.execute(
1682            table.clone(),
1683            "SELECT * FROM test WHERE country NOT IN ('ca')",
1684        );
1685        match result {
1686            Ok(view) => {
1687                println!(
1688                    "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1689                    view.row_count()
1690                );
1691                assert_eq!(view.row_count(), 2); // Should find us and UK rows
1692            }
1693            Err(e) => {
1694                panic!("Case-insensitive NOT IN query failed: {e}");
1695            }
1696        }
1697
1698        // Test case-sensitive (default) - should NOT match 'CA' with 'ca'
1699        println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1700        let engine_case_sensitive = QueryEngine::new(); // defaults to case_insensitive=false
1701        let result = engine_case_sensitive
1702            .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1703        match result {
1704            Ok(view) => {
1705                println!(
1706                    "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1707                    view.row_count()
1708                );
1709                assert_eq!(view.row_count(), 0); // Should find no rows (CA != ca)
1710            }
1711            Err(e) => {
1712                panic!("Case-sensitive IN query failed: {e}");
1713            }
1714        }
1715
1716        println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1717    }
1718
1719    #[test]
1720    #[ignore = "Parentheses in WHERE clause not yet implemented"]
1721    fn test_parentheses_in_where_clause() {
1722        // Initialize tracing for debug output
1723        let _ = tracing_subscriber::fmt()
1724            .with_max_level(tracing::Level::DEBUG)
1725            .try_init();
1726
1727        let mut table = DataTable::new("test");
1728        table.add_column(DataColumn::new("id"));
1729        table.add_column(DataColumn::new("status"));
1730        table.add_column(DataColumn::new("priority"));
1731
1732        // Add test data
1733        table
1734            .add_row(DataRow::new(vec![
1735                DataValue::Integer(1),
1736                DataValue::String("Pending".to_string()),
1737                DataValue::String("High".to_string()),
1738            ]))
1739            .unwrap();
1740
1741        table
1742            .add_row(DataRow::new(vec![
1743                DataValue::Integer(2),
1744                DataValue::String("Complete".to_string()),
1745                DataValue::String("High".to_string()),
1746            ]))
1747            .unwrap();
1748
1749        table
1750            .add_row(DataRow::new(vec![
1751                DataValue::Integer(3),
1752                DataValue::String("Pending".to_string()),
1753                DataValue::String("Low".to_string()),
1754            ]))
1755            .unwrap();
1756
1757        table
1758            .add_row(DataRow::new(vec![
1759                DataValue::Integer(4),
1760                DataValue::String("Complete".to_string()),
1761                DataValue::String("Low".to_string()),
1762            ]))
1763            .unwrap();
1764
1765        let table = Arc::new(table);
1766        let engine = QueryEngine::new();
1767
1768        println!("\n=== Testing Parentheses in WHERE clause ===");
1769        println!("Table has {} rows", table.row_count());
1770        for i in 0..table.row_count() {
1771            let status = table.get_value(i, 1);
1772            let priority = table.get_value(i, 2);
1773            println!("Row {i}: status = {status:?}, priority = {priority:?}");
1774        }
1775
1776        // Test OR with parentheses - should get (Pending AND High) OR (Complete AND Low)
1777        println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1778        let result = engine.execute(
1779            table.clone(),
1780            "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1781        );
1782        match result {
1783            Ok(view) => {
1784                println!(
1785                    "SUCCESS: Found {} rows with parenthetical logic",
1786                    view.row_count()
1787                );
1788                assert_eq!(view.row_count(), 2); // Should find rows 1 and 4
1789            }
1790            Err(e) => {
1791                panic!("Parentheses query failed: {e}");
1792            }
1793        }
1794
1795        println!("\n=== Parentheses test complete! ===");
1796    }
1797
1798    #[test]
1799    #[ignore = "Numeric type coercion needs fixing"]
1800    fn test_numeric_type_coercion() {
1801        // Initialize tracing for debug output
1802        let _ = tracing_subscriber::fmt()
1803            .with_max_level(tracing::Level::DEBUG)
1804            .try_init();
1805
1806        let mut table = DataTable::new("test");
1807        table.add_column(DataColumn::new("id"));
1808        table.add_column(DataColumn::new("price"));
1809        table.add_column(DataColumn::new("quantity"));
1810
1811        // Add test data with different numeric types
1812        table
1813            .add_row(DataRow::new(vec![
1814                DataValue::Integer(1),
1815                DataValue::Float(99.50), // Contains '.'
1816                DataValue::Integer(100),
1817            ]))
1818            .unwrap();
1819
1820        table
1821            .add_row(DataRow::new(vec![
1822                DataValue::Integer(2),
1823                DataValue::Float(150.0), // Contains '.' and '0'
1824                DataValue::Integer(200),
1825            ]))
1826            .unwrap();
1827
1828        table
1829            .add_row(DataRow::new(vec![
1830                DataValue::Integer(3),
1831                DataValue::Integer(75), // No decimal point
1832                DataValue::Integer(50),
1833            ]))
1834            .unwrap();
1835
1836        let table = Arc::new(table);
1837        let engine = QueryEngine::new();
1838
1839        println!("\n=== Testing Numeric Type Coercion ===");
1840        println!("Table has {} rows", table.row_count());
1841        for i in 0..table.row_count() {
1842            let price = table.get_value(i, 1);
1843            let quantity = table.get_value(i, 2);
1844            println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1845        }
1846
1847        // Test Contains on float values - should find rows with decimal points
1848        println!("\n--- Test: price.Contains('.') ---");
1849        let result = engine.execute(
1850            table.clone(),
1851            "SELECT * FROM test WHERE price.Contains('.')",
1852        );
1853        match result {
1854            Ok(view) => {
1855                println!(
1856                    "SUCCESS: Found {} rows with decimal points in price",
1857                    view.row_count()
1858                );
1859                assert_eq!(view.row_count(), 2); // Should find 99.50 and 150.0
1860            }
1861            Err(e) => {
1862                panic!("Numeric Contains query failed: {e}");
1863            }
1864        }
1865
1866        // Test Contains on integer values converted to string
1867        println!("\n--- Test: quantity.Contains('0') ---");
1868        let result = engine.execute(
1869            table.clone(),
1870            "SELECT * FROM test WHERE quantity.Contains('0')",
1871        );
1872        match result {
1873            Ok(view) => {
1874                println!(
1875                    "SUCCESS: Found {} rows with '0' in quantity",
1876                    view.row_count()
1877                );
1878                assert_eq!(view.row_count(), 2); // Should find 100 and 200
1879            }
1880            Err(e) => {
1881                panic!("Integer Contains query failed: {e}");
1882            }
1883        }
1884
1885        println!("\n=== Numeric type coercion test complete! ===");
1886    }
1887
1888    #[test]
1889    fn test_datetime_comparisons() {
1890        // Initialize tracing for debug output
1891        let _ = tracing_subscriber::fmt()
1892            .with_max_level(tracing::Level::DEBUG)
1893            .try_init();
1894
1895        let mut table = DataTable::new("test");
1896        table.add_column(DataColumn::new("id"));
1897        table.add_column(DataColumn::new("created_date"));
1898
1899        // Add test data with date strings (as they would come from CSV)
1900        table
1901            .add_row(DataRow::new(vec![
1902                DataValue::Integer(1),
1903                DataValue::String("2024-12-15".to_string()),
1904            ]))
1905            .unwrap();
1906
1907        table
1908            .add_row(DataRow::new(vec![
1909                DataValue::Integer(2),
1910                DataValue::String("2025-01-15".to_string()),
1911            ]))
1912            .unwrap();
1913
1914        table
1915            .add_row(DataRow::new(vec![
1916                DataValue::Integer(3),
1917                DataValue::String("2025-02-15".to_string()),
1918            ]))
1919            .unwrap();
1920
1921        let table = Arc::new(table);
1922        let engine = QueryEngine::new();
1923
1924        println!("\n=== Testing DateTime Comparisons ===");
1925        println!("Table has {} rows", table.row_count());
1926        for i in 0..table.row_count() {
1927            let date = table.get_value(i, 1);
1928            println!("Row {i}: created_date = {date:?}");
1929        }
1930
1931        // Test DateTime constructor comparison - should find dates after 2025-01-01
1932        println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
1933        let result = engine.execute(
1934            table.clone(),
1935            "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
1936        );
1937        match result {
1938            Ok(view) => {
1939                println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
1940                assert_eq!(view.row_count(), 2); // Should find 2025-01-15 and 2025-02-15
1941            }
1942            Err(e) => {
1943                panic!("DateTime comparison query failed: {e}");
1944            }
1945        }
1946
1947        println!("\n=== DateTime comparison test complete! ===");
1948    }
1949
1950    #[test]
1951    fn test_not_with_method_calls() {
1952        // Initialize tracing for debug output
1953        let _ = tracing_subscriber::fmt()
1954            .with_max_level(tracing::Level::DEBUG)
1955            .try_init();
1956
1957        let mut table = DataTable::new("test");
1958        table.add_column(DataColumn::new("id"));
1959        table.add_column(DataColumn::new("status"));
1960
1961        // Add test data
1962        table
1963            .add_row(DataRow::new(vec![
1964                DataValue::Integer(1),
1965                DataValue::String("Pending Review".to_string()),
1966            ]))
1967            .unwrap();
1968
1969        table
1970            .add_row(DataRow::new(vec![
1971                DataValue::Integer(2),
1972                DataValue::String("Complete".to_string()),
1973            ]))
1974            .unwrap();
1975
1976        table
1977            .add_row(DataRow::new(vec![
1978                DataValue::Integer(3),
1979                DataValue::String("Pending Approval".to_string()),
1980            ]))
1981            .unwrap();
1982
1983        let table = Arc::new(table);
1984        let engine = QueryEngine::with_case_insensitive(true);
1985
1986        println!("\n=== Testing NOT with Method Calls ===");
1987        println!("Table has {} rows", table.row_count());
1988        for i in 0..table.row_count() {
1989            let status = table.get_value(i, 1);
1990            println!("Row {i}: status = {status:?}");
1991        }
1992
1993        // Test NOT with Contains - should exclude rows containing "pend"
1994        println!("\n--- Test: NOT status.Contains('pend') ---");
1995        let result = engine.execute(
1996            table.clone(),
1997            "SELECT * FROM test WHERE NOT status.Contains('pend')",
1998        );
1999        match result {
2000            Ok(view) => {
2001                println!(
2002                    "SUCCESS: Found {} rows NOT containing 'pend'",
2003                    view.row_count()
2004                );
2005                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2006            }
2007            Err(e) => {
2008                panic!("NOT Contains query failed: {e}");
2009            }
2010        }
2011
2012        // Test NOT with StartsWith
2013        println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2014        let result = engine.execute(
2015            table.clone(),
2016            "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2017        );
2018        match result {
2019            Ok(view) => {
2020                println!(
2021                    "SUCCESS: Found {} rows NOT starting with 'Pending'",
2022                    view.row_count()
2023                );
2024                assert_eq!(view.row_count(), 1); // Should find only "Complete"
2025            }
2026            Err(e) => {
2027                panic!("NOT StartsWith query failed: {e}");
2028            }
2029        }
2030
2031        println!("\n=== NOT with method calls test complete! ===");
2032    }
2033
2034    #[test]
2035    #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2036    fn test_complex_logical_expressions() {
2037        // Initialize tracing for debug output
2038        let _ = tracing_subscriber::fmt()
2039            .with_max_level(tracing::Level::DEBUG)
2040            .try_init();
2041
2042        let mut table = DataTable::new("test");
2043        table.add_column(DataColumn::new("id"));
2044        table.add_column(DataColumn::new("status"));
2045        table.add_column(DataColumn::new("priority"));
2046        table.add_column(DataColumn::new("assigned"));
2047
2048        // Add comprehensive test data
2049        table
2050            .add_row(DataRow::new(vec![
2051                DataValue::Integer(1),
2052                DataValue::String("Pending".to_string()),
2053                DataValue::String("High".to_string()),
2054                DataValue::String("John".to_string()),
2055            ]))
2056            .unwrap();
2057
2058        table
2059            .add_row(DataRow::new(vec![
2060                DataValue::Integer(2),
2061                DataValue::String("Complete".to_string()),
2062                DataValue::String("High".to_string()),
2063                DataValue::String("Jane".to_string()),
2064            ]))
2065            .unwrap();
2066
2067        table
2068            .add_row(DataRow::new(vec![
2069                DataValue::Integer(3),
2070                DataValue::String("Pending".to_string()),
2071                DataValue::String("Low".to_string()),
2072                DataValue::String("John".to_string()),
2073            ]))
2074            .unwrap();
2075
2076        table
2077            .add_row(DataRow::new(vec![
2078                DataValue::Integer(4),
2079                DataValue::String("In Progress".to_string()),
2080                DataValue::String("Medium".to_string()),
2081                DataValue::String("Jane".to_string()),
2082            ]))
2083            .unwrap();
2084
2085        let table = Arc::new(table);
2086        let engine = QueryEngine::new();
2087
2088        println!("\n=== Testing Complex Logical Expressions ===");
2089        println!("Table has {} rows", table.row_count());
2090        for i in 0..table.row_count() {
2091            let status = table.get_value(i, 1);
2092            let priority = table.get_value(i, 2);
2093            let assigned = table.get_value(i, 3);
2094            println!(
2095                "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2096            );
2097        }
2098
2099        // Test complex AND/OR logic
2100        println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2101        let result = engine.execute(
2102            table.clone(),
2103            "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2104        );
2105        match result {
2106            Ok(view) => {
2107                println!(
2108                    "SUCCESS: Found {} rows with complex logic",
2109                    view.row_count()
2110                );
2111                assert_eq!(view.row_count(), 2); // Should find rows 1 and 3 (both Pending, one High priority, both assigned to John)
2112            }
2113            Err(e) => {
2114                panic!("Complex logic query failed: {e}");
2115            }
2116        }
2117
2118        // Test NOT with complex expressions
2119        println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2120        let result = engine.execute(
2121            table.clone(),
2122            "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2123        );
2124        match result {
2125            Ok(view) => {
2126                println!(
2127                    "SUCCESS: Found {} rows with NOT complex logic",
2128                    view.row_count()
2129                );
2130                assert_eq!(view.row_count(), 2); // Should find rows 1 (Pending+High) and 4 (In Progress+Medium)
2131            }
2132            Err(e) => {
2133                panic!("NOT complex logic query failed: {e}");
2134            }
2135        }
2136
2137        println!("\n=== Complex logical expressions test complete! ===");
2138    }
2139
2140    #[test]
2141    fn test_mixed_data_types_and_edge_cases() {
2142        // Initialize tracing for debug output
2143        let _ = tracing_subscriber::fmt()
2144            .with_max_level(tracing::Level::DEBUG)
2145            .try_init();
2146
2147        let mut table = DataTable::new("test");
2148        table.add_column(DataColumn::new("id"));
2149        table.add_column(DataColumn::new("value"));
2150        table.add_column(DataColumn::new("nullable_field"));
2151
2152        // Add test data with mixed types and edge cases
2153        table
2154            .add_row(DataRow::new(vec![
2155                DataValue::Integer(1),
2156                DataValue::String("123.45".to_string()),
2157                DataValue::String("present".to_string()),
2158            ]))
2159            .unwrap();
2160
2161        table
2162            .add_row(DataRow::new(vec![
2163                DataValue::Integer(2),
2164                DataValue::Float(678.90),
2165                DataValue::Null,
2166            ]))
2167            .unwrap();
2168
2169        table
2170            .add_row(DataRow::new(vec![
2171                DataValue::Integer(3),
2172                DataValue::Boolean(true),
2173                DataValue::String("also present".to_string()),
2174            ]))
2175            .unwrap();
2176
2177        table
2178            .add_row(DataRow::new(vec![
2179                DataValue::Integer(4),
2180                DataValue::String("false".to_string()),
2181                DataValue::Null,
2182            ]))
2183            .unwrap();
2184
2185        let table = Arc::new(table);
2186        let engine = QueryEngine::new();
2187
2188        println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2189        println!("Table has {} rows", table.row_count());
2190        for i in 0..table.row_count() {
2191            let value = table.get_value(i, 1);
2192            let nullable = table.get_value(i, 2);
2193            println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2194        }
2195
2196        // Test type coercion with boolean Contains
2197        println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2198        let result = engine.execute(
2199            table.clone(),
2200            "SELECT * FROM test WHERE value.Contains('true')",
2201        );
2202        match result {
2203            Ok(view) => {
2204                println!(
2205                    "SUCCESS: Found {} rows with boolean coercion",
2206                    view.row_count()
2207                );
2208                assert_eq!(view.row_count(), 1); // Should find the boolean true row
2209            }
2210            Err(e) => {
2211                panic!("Boolean coercion query failed: {e}");
2212            }
2213        }
2214
2215        // Test multiple IN values with mixed types
2216        println!("\n--- Test: id IN (1, 3) ---");
2217        let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2218        match result {
2219            Ok(view) => {
2220                println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2221                assert_eq!(view.row_count(), 2); // Should find rows with id 1 and 3
2222            }
2223            Err(e) => {
2224                panic!("Multiple IN values query failed: {e}");
2225            }
2226        }
2227
2228        println!("\n=== Mixed data types test complete! ===");
2229    }
2230
2231    /// Test that aggregate-only queries return exactly one row (regression test)
2232    #[test]
2233    fn test_aggregate_only_single_row() {
2234        let table = create_test_stock_data();
2235        let engine = QueryEngine::new();
2236
2237        // Test query with multiple aggregates - should return exactly 1 row
2238        let result = engine
2239            .execute(
2240                table.clone(),
2241                "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2242            )
2243            .expect("Query should succeed");
2244
2245        assert_eq!(
2246            result.row_count(),
2247            1,
2248            "Aggregate-only query should return exactly 1 row"
2249        );
2250        assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2251
2252        // Verify the actual values are correct
2253        let source = result.source();
2254        let row = source.get_row(0).expect("Should have first row");
2255
2256        // COUNT(*) should be 5 (total rows)
2257        assert_eq!(row.values[0], DataValue::Integer(5));
2258
2259        // MIN should be 99.5
2260        assert_eq!(row.values[1], DataValue::Float(99.5));
2261
2262        // MAX should be 105.0
2263        assert_eq!(row.values[2], DataValue::Float(105.0));
2264
2265        // AVG should be approximately 102.4
2266        if let DataValue::Float(avg) = &row.values[3] {
2267            assert!(
2268                (avg - 102.4).abs() < 0.01,
2269                "Average should be approximately 102.4, got {}",
2270                avg
2271            );
2272        } else {
2273            panic!("AVG should return a Float value");
2274        }
2275    }
2276
2277    /// Test single aggregate function returns single row
2278    #[test]
2279    fn test_single_aggregate_single_row() {
2280        let table = create_test_stock_data();
2281        let engine = QueryEngine::new();
2282
2283        let result = engine
2284            .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2285            .expect("Query should succeed");
2286
2287        assert_eq!(
2288            result.row_count(),
2289            1,
2290            "Single aggregate query should return exactly 1 row"
2291        );
2292        assert_eq!(result.column_count(), 1, "Should have 1 column");
2293
2294        let source = result.source();
2295        let row = source.get_row(0).expect("Should have first row");
2296        assert_eq!(row.values[0], DataValue::Integer(5));
2297    }
2298
2299    /// Test aggregate with WHERE clause filtering
2300    #[test]
2301    fn test_aggregate_with_where_single_row() {
2302        let table = create_test_stock_data();
2303        let engine = QueryEngine::new();
2304
2305        // Filter to only high-value stocks (>= 103.0) and aggregate
2306        let result = engine
2307            .execute(
2308                table.clone(),
2309                "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2310            )
2311            .expect("Query should succeed");
2312
2313        assert_eq!(
2314            result.row_count(),
2315            1,
2316            "Filtered aggregate query should return exactly 1 row"
2317        );
2318        assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2319
2320        let source = result.source();
2321        let row = source.get_row(0).expect("Should have first row");
2322
2323        // Should find 2 rows (103.5 and 105.0)
2324        assert_eq!(row.values[0], DataValue::Integer(2));
2325        assert_eq!(row.values[1], DataValue::Float(103.5)); // MIN
2326        assert_eq!(row.values[2], DataValue::Float(105.0)); // MAX
2327    }
2328
2329    #[test]
2330    fn test_not_in_parsing() {
2331        use crate::sql::recursive_parser::Parser;
2332
2333        let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2334        println!("\n=== Testing NOT IN parsing ===");
2335        println!("Parsing query: {query}");
2336
2337        let mut parser = Parser::new(query);
2338        match parser.parse() {
2339            Ok(statement) => {
2340                println!("Parsed statement: {statement:#?}");
2341                if let Some(where_clause) = statement.where_clause {
2342                    println!("WHERE conditions: {:#?}", where_clause.conditions);
2343                    if let Some(first_condition) = where_clause.conditions.first() {
2344                        println!("First condition expression: {:#?}", first_condition.expr);
2345                    }
2346                }
2347            }
2348            Err(e) => {
2349                panic!("Parse error: {e}");
2350            }
2351        }
2352    }
2353
2354    /// Create test stock data for aggregate testing
2355    fn create_test_stock_data() -> Arc<DataTable> {
2356        let mut table = DataTable::new("stock");
2357
2358        table.add_column(DataColumn::new("symbol"));
2359        table.add_column(DataColumn::new("close"));
2360        table.add_column(DataColumn::new("volume"));
2361
2362        // Add 5 rows of test data
2363        let test_data = vec![
2364            ("AAPL", 99.5, 1000),
2365            ("AAPL", 101.2, 1500),
2366            ("AAPL", 103.5, 2000),
2367            ("AAPL", 105.0, 1200),
2368            ("AAPL", 102.8, 1800),
2369        ];
2370
2371        for (symbol, close, volume) in test_data {
2372            table
2373                .add_row(DataRow::new(vec![
2374                    DataValue::String(symbol.to_string()),
2375                    DataValue::Float(close),
2376                    DataValue::Integer(volume),
2377                ]))
2378                .expect("Should add row successfully");
2379        }
2380
2381        Arc::new(table)
2382    }
2383}
2384
2385#[cfg(test)]
2386#[path = "query_engine_tests.rs"]
2387mod query_engine_tests;