1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26 TableFunction,
27};
28
29#[derive(Clone)]
31pub struct QueryEngine {
32 case_insensitive: bool,
33 date_notation: String,
34 behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl QueryEngine {
44 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 case_insensitive: false,
48 date_notation: get_date_notation(),
49 behavior_config: None,
50 }
51 }
52
53 #[must_use]
54 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55 let case_insensitive = config.case_insensitive_default;
56 let date_notation = get_date_notation();
58 Self {
59 case_insensitive,
60 date_notation,
61 behavior_config: Some(config),
62 }
63 }
64
65 #[must_use]
66 pub fn with_date_notation(_date_notation: String) -> Self {
67 Self {
68 case_insensitive: false,
69 date_notation: get_date_notation(), behavior_config: None,
71 }
72 }
73
74 #[must_use]
75 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76 Self {
77 case_insensitive,
78 date_notation: get_date_notation(),
79 behavior_config: None,
80 }
81 }
82
83 #[must_use]
84 pub fn with_case_insensitive_and_date_notation(
85 case_insensitive: bool,
86 _date_notation: String, ) -> Self {
88 Self {
89 case_insensitive,
90 date_notation: get_date_notation(), behavior_config: None,
92 }
93 }
94
95 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97 let columns = table.column_names();
98 let mut best_match: Option<(String, usize)> = None;
99
100 for col in columns {
101 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102 let max_distance = if name.len() > 10 { 3 } else { 2 };
105 if distance <= max_distance {
106 match &best_match {
107 None => best_match = Some((col, distance)),
108 Some((_, best_dist)) if distance < *best_dist => {
109 best_match = Some((col, distance));
110 }
111 _ => {}
112 }
113 }
114 }
115
116 best_match.map(|(name, _)| name)
117 }
118
119 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121 let len1 = s1.len();
122 let len2 = s2.len();
123 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125 for i in 0..=len1 {
126 matrix[i][0] = i;
127 }
128 for j in 0..=len2 {
129 matrix[0][j] = j;
130 }
131
132 for (i, c1) in s1.chars().enumerate() {
133 for (j, c2) in s2.chars().enumerate() {
134 let cost = usize::from(c1 != c2);
135 matrix[i + 1][j + 1] = std::cmp::min(
136 matrix[i][j + 1] + 1, std::cmp::min(
138 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
141 );
142 }
143 }
144
145 matrix[len1][len2]
146 }
147
148 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150 let (view, _plan) = self.execute_with_plan(table, sql)?;
151 Ok(view)
152 }
153
154 pub fn execute_statement(
156 &self,
157 table: Arc<DataTable>,
158 statement: SelectStatement,
159 ) -> Result<DataView> {
160 let mut cte_context = HashMap::new();
162 for cte in &statement.ctes {
163 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164 let cte_result = match &cte.cte_type {
166 CTEType::Standard(query) => {
167 let view = self.build_view_with_context(
169 table.clone(),
170 query.clone(),
171 &mut cte_context,
172 )?;
173
174 let mut materialized = self.materialize_view(view)?;
176
177 for column in materialized.columns_mut() {
179 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180 column.source_table = Some(cte.name.clone());
181 }
182
183 DataView::new(Arc::new(materialized))
184 }
185 CTEType::Web(web_spec) => {
186 use crate::web::http_fetcher::WebDataFetcher;
188
189 let fetcher = WebDataFetcher::new()?;
190 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192 for column in data_table.columns_mut() {
194 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195 column.source_table = Some(cte.name.clone());
196 }
197
198 DataView::new(Arc::new(data_table))
200 }
201 };
202 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204 debug!(
205 "QueryEngine: CTE '{}' pre-processed, stored in context",
206 cte.name
207 );
208 }
209
210 let mut subquery_executor =
212 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215 self.build_view_with_context(table, processed_statement, &mut cte_context)
217 }
218
219 pub fn execute_statement_with_cte_context(
221 &self,
222 table: Arc<DataTable>,
223 statement: SelectStatement,
224 cte_context: &HashMap<String, Arc<DataView>>,
225 ) -> Result<DataView> {
226 let mut local_context = cte_context.clone();
228
229 for cte in &statement.ctes {
231 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232 let cte_result = match &cte.cte_type {
233 CTEType::Standard(query) => {
234 let view = self.build_view_with_context(
235 table.clone(),
236 query.clone(),
237 &mut local_context,
238 )?;
239
240 let mut materialized = self.materialize_view(view)?;
242
243 for column in materialized.columns_mut() {
245 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246 column.source_table = Some(cte.name.clone());
247 }
248
249 DataView::new(Arc::new(materialized))
250 }
251 CTEType::Web(web_spec) => {
252 use crate::web::http_fetcher::WebDataFetcher;
254
255 let fetcher = WebDataFetcher::new()?;
256 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258 for column in data_table.columns_mut() {
260 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261 column.source_table = Some(cte.name.clone());
262 }
263
264 DataView::new(Arc::new(data_table))
266 }
267 };
268 local_context.insert(cte.name.clone(), Arc::new(cte_result));
269 }
270
271 let mut subquery_executor =
273 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276 self.build_view_with_context(table, processed_statement, &mut local_context)
278 }
279
280 pub fn execute_with_plan(
282 &self,
283 table: Arc<DataTable>,
284 sql: &str,
285 ) -> Result<(DataView, ExecutionPlan)> {
286 let mut plan_builder = ExecutionPlanBuilder::new();
287 let start_time = Instant::now();
288
289 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291 plan_builder.add_detail(format!("Query: {}", sql));
292 let mut parser = Parser::new(sql);
293 let statement = parser
294 .parse()
295 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296 plan_builder.add_detail(format!("Parsed successfully"));
297 if let Some(from) = &statement.from_table {
298 plan_builder.add_detail(format!("FROM: {}", from));
299 }
300 if statement.where_clause.is_some() {
301 plan_builder.add_detail("WHERE clause present".to_string());
302 }
303 plan_builder.end_step();
304
305 let mut cte_context = HashMap::new();
307
308 if !statement.ctes.is_empty() {
309 plan_builder.begin_step(
310 StepType::CTE,
311 format!("Process {} CTEs", statement.ctes.len()),
312 );
313
314 for cte in &statement.ctes {
315 let cte_start = Instant::now();
316 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318 let cte_result = match &cte.cte_type {
319 CTEType::Standard(query) => {
320 if let Some(from) = &query.from_table {
322 plan_builder.add_detail(format!("Source: {}", from));
323 }
324 if query.where_clause.is_some() {
325 plan_builder.add_detail("Has WHERE clause".to_string());
326 }
327 if query.group_by.is_some() {
328 plan_builder.add_detail("Has GROUP BY".to_string());
329 }
330
331 debug!(
332 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333 cte.name,
334 cte_context.keys().collect::<Vec<_>>()
335 );
336
337 let mut subquery_executor = SubqueryExecutor::with_cte_context(
340 self.clone(),
341 table.clone(),
342 cte_context.clone(),
343 );
344 let processed_query = subquery_executor.execute_subqueries(query)?;
345
346 let view = self.build_view_with_context(
347 table.clone(),
348 processed_query,
349 &mut cte_context,
350 )?;
351
352 let mut materialized = self.materialize_view(view)?;
354
355 for column in materialized.columns_mut() {
357 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358 column.source_table = Some(cte.name.clone());
359 }
360
361 DataView::new(Arc::new(materialized))
362 }
363 CTEType::Web(web_spec) => {
364 plan_builder.add_detail(format!("URL: {}", web_spec.url));
365 if let Some(format) = &web_spec.format {
366 plan_builder.add_detail(format!("Format: {:?}", format));
367 }
368 if let Some(cache) = web_spec.cache_seconds {
369 plan_builder.add_detail(format!("Cache: {} seconds", cache));
370 }
371
372 use crate::web::http_fetcher::WebDataFetcher;
374
375 let fetcher = WebDataFetcher::new()?;
376 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378 for column in data_table.columns_mut() {
380 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381 column.source_table = Some(cte.name.clone());
382 }
383
384 DataView::new(Arc::new(data_table))
386 }
387 };
388
389 plan_builder.set_rows_out(cte_result.row_count());
391 plan_builder.add_detail(format!(
392 "Result: {} rows, {} columns",
393 cte_result.row_count(),
394 cte_result.column_count()
395 ));
396 plan_builder.add_detail(format!(
397 "Execution time: {:.3}ms",
398 cte_start.elapsed().as_secs_f64() * 1000.0
399 ));
400
401 debug!(
402 "QueryEngine: Storing CTE '{}' in context with {} rows",
403 cte.name,
404 cte_result.row_count()
405 );
406 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407 plan_builder.end_step();
408 }
409
410 plan_builder.add_detail(format!(
411 "All {} CTEs cached in context",
412 statement.ctes.len()
413 ));
414 plan_builder.end_step();
415 }
416
417 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419 let mut subquery_executor =
420 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424 format!("{:?}", w).contains("Subquery")
426 });
427
428 if has_subqueries {
429 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430 }
431
432 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434 if has_subqueries {
435 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436 } else {
437 plan_builder.add_detail("No subqueries to process".to_string());
438 }
439
440 plan_builder.end_step();
441 let result = self.build_view_with_context_and_plan(
442 table,
443 processed_statement,
444 &mut cte_context,
445 &mut plan_builder,
446 )?;
447
448 let total_duration = start_time.elapsed();
449 info!(
450 "Query execution complete: total={:?}, rows={}",
451 total_duration,
452 result.row_count()
453 );
454
455 let plan = plan_builder.build();
456 Ok((result, plan))
457 }
458
459 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461 let mut cte_context = HashMap::new();
462 self.build_view_with_context(table, statement, &mut cte_context)
463 }
464
465 fn build_view_with_context(
467 &self,
468 table: Arc<DataTable>,
469 statement: SelectStatement,
470 cte_context: &mut HashMap<String, Arc<DataView>>,
471 ) -> Result<DataView> {
472 let mut dummy_plan = ExecutionPlanBuilder::new();
473 self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474 }
475
476 fn build_view_with_context_and_plan(
478 &self,
479 table: Arc<DataTable>,
480 statement: SelectStatement,
481 cte_context: &mut HashMap<String, Arc<DataView>>,
482 plan: &mut ExecutionPlanBuilder,
483 ) -> Result<DataView> {
484 for cte in &statement.ctes {
486 if cte_context.contains_key(&cte.name) {
488 debug!(
489 "QueryEngine: CTE '{}' already in context, skipping",
490 cte.name
491 );
492 continue;
493 }
494
495 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496 debug!(
497 "QueryEngine: Available CTEs for '{}': {:?}",
498 cte.name,
499 cte_context.keys().collect::<Vec<_>>()
500 );
501
502 let cte_result = match &cte.cte_type {
504 CTEType::Standard(query) => {
505 let view =
506 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508 let mut materialized = self.materialize_view(view)?;
510
511 for column in materialized.columns_mut() {
513 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514 column.source_table = Some(cte.name.clone());
515 }
516
517 DataView::new(Arc::new(materialized))
518 }
519 CTEType::Web(_web_spec) => {
520 return Err(anyhow!(
522 "Web CTEs should be processed in execute_select method"
523 ));
524 }
525 };
526
527 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529 debug!(
530 "QueryEngine: CTE '{}' processed, stored in context",
531 cte.name
532 );
533 }
534
535 let source_table = if let Some(ref table_func) = statement.from_function {
537 debug!("QueryEngine: Processing table function...");
539 match table_func {
540 TableFunction::Generator { name, args } => {
541 use crate::sql::generators::GeneratorRegistry;
543
544 let registry = GeneratorRegistry::new();
546
547 if let Some(generator) = registry.get(name) {
548 let mut evaluator = ArithmeticEvaluator::with_date_notation(
550 &table,
551 self.date_notation.clone(),
552 );
553 let dummy_row = 0;
554
555 let mut evaluated_args = Vec::new();
556 for arg in args {
557 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
558 }
559
560 generator.generate(evaluated_args)?
562 } else {
563 return Err(anyhow!("Unknown generator function: {}", name));
564 }
565 }
566 }
567 } else if let Some(ref subquery) = statement.from_subquery {
568 debug!("QueryEngine: Processing FROM subquery...");
570 let subquery_result =
571 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
572
573 let materialized = self.materialize_view(subquery_result)?;
576 Arc::new(materialized)
577 } else if let Some(ref table_name) = statement.from_table {
578 if let Some(cte_view) = cte_context.get(table_name) {
580 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
581 let materialized = self.materialize_view((**cte_view).clone())?;
583 Arc::new(materialized)
584 } else {
585 table.clone()
587 }
588 } else {
589 table.clone()
591 };
592
593 let final_table = if !statement.joins.is_empty() {
595 plan.begin_step(
596 StepType::Join,
597 format!("Process {} JOINs", statement.joins.len()),
598 );
599 plan.set_rows_in(source_table.row_count());
600
601 let join_executor = HashJoinExecutor::new(self.case_insensitive);
602 let mut current_table = source_table;
603
604 for (idx, join_clause) in statement.joins.iter().enumerate() {
605 let join_start = Instant::now();
606 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
607 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
608 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
609 plan.add_detail(format!(
610 "Executing {:?} JOIN on {}",
611 join_clause.join_type, join_clause.condition.left_column
612 ));
613
614 let right_table = match &join_clause.table {
616 TableSource::Table(name) => {
617 if let Some(cte_view) = cte_context.get(name) {
619 let materialized = self.materialize_view((**cte_view).clone())?;
620 Arc::new(materialized)
621 } else {
622 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
625 }
626 }
627 TableSource::DerivedTable { query, alias: _ } => {
628 let subquery_result = self.build_view_with_context(
630 table.clone(),
631 *query.clone(),
632 cte_context,
633 )?;
634 let materialized = self.materialize_view(subquery_result)?;
635 Arc::new(materialized)
636 }
637 };
638
639 let joined = join_executor.execute_join(
641 current_table.clone(),
642 join_clause,
643 right_table.clone(),
644 )?;
645
646 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
647 plan.set_rows_out(joined.row_count());
648 plan.add_detail(format!("Result: {} rows", joined.row_count()));
649 plan.add_detail(format!(
650 "Join time: {:.3}ms",
651 join_start.elapsed().as_secs_f64() * 1000.0
652 ));
653 plan.end_step();
654
655 current_table = Arc::new(joined);
656 }
657
658 plan.set_rows_out(current_table.row_count());
659 plan.add_detail(format!(
660 "Final result after all joins: {} rows",
661 current_table.row_count()
662 ));
663 plan.end_step();
664 current_table
665 } else {
666 source_table
667 };
668
669 self.build_view_internal_with_plan(final_table, statement, plan)
671 }
672
673 fn materialize_view(&self, view: DataView) -> Result<DataTable> {
675 let source = view.source();
676 let mut result_table = DataTable::new("derived");
677
678 let visible_cols = view.visible_column_indices().to_vec();
680
681 for col_idx in &visible_cols {
683 let col = &source.columns[*col_idx];
684 let new_col = DataColumn {
685 name: col.name.clone(),
686 data_type: col.data_type.clone(),
687 nullable: col.nullable,
688 unique_values: col.unique_values,
689 null_count: col.null_count,
690 metadata: col.metadata.clone(),
691 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
694 result_table.add_column(new_col);
695 }
696
697 for row_idx in view.visible_row_indices() {
699 let source_row = &source.rows[*row_idx];
700 let mut new_row = DataRow { values: Vec::new() };
701
702 for col_idx in &visible_cols {
703 new_row.values.push(source_row.values[*col_idx].clone());
704 }
705
706 result_table.add_row(new_row);
707 }
708
709 Ok(result_table)
710 }
711
712 fn build_view_internal(
713 &self,
714 table: Arc<DataTable>,
715 statement: SelectStatement,
716 ) -> Result<DataView> {
717 let mut dummy_plan = ExecutionPlanBuilder::new();
718 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
719 }
720
721 fn build_view_internal_with_plan(
722 &self,
723 table: Arc<DataTable>,
724 statement: SelectStatement,
725 plan: &mut ExecutionPlanBuilder,
726 ) -> Result<DataView> {
727 debug!(
728 "QueryEngine::build_view - select_items: {:?}",
729 statement.select_items
730 );
731 debug!(
732 "QueryEngine::build_view - where_clause: {:?}",
733 statement.where_clause
734 );
735
736 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
738
739 if let Some(where_clause) = &statement.where_clause {
741 let total_rows = table.row_count();
742 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
743 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
744
745 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
746 plan.set_rows_in(total_rows);
747 plan.add_detail(format!("Input: {} rows", total_rows));
748
749 for condition in &where_clause.conditions {
751 plan.add_detail(format!("Condition: {:?}", condition.expr));
752 }
753
754 let filter_start = Instant::now();
755 let mut eval_context = EvaluationContext::new(self.case_insensitive);
757
758 let mut filtered_rows = Vec::new();
760 for row_idx in visible_rows {
761 if row_idx < 3 {
763 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
764 }
765 let mut evaluator =
766 RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
767 match evaluator.evaluate(where_clause, row_idx) {
768 Ok(result) => {
769 if row_idx < 3 {
770 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
771 }
772 if result {
773 filtered_rows.push(row_idx);
774 }
775 }
776 Err(e) => {
777 if row_idx < 3 {
778 debug!(
779 "QueryEngine: WHERE evaluation error for row {}: {}",
780 row_idx, e
781 );
782 }
783 return Err(e);
785 }
786 }
787 }
788
789 let (compilations, cache_hits) = eval_context.get_stats();
791 if compilations > 0 || cache_hits > 0 {
792 debug!(
793 "LIKE pattern cache: {} compilations, {} cache hits",
794 compilations, cache_hits
795 );
796 }
797 visible_rows = filtered_rows;
798 let filter_duration = filter_start.elapsed();
799 info!(
800 "WHERE clause filtering: {} rows -> {} rows in {:?}",
801 total_rows,
802 visible_rows.len(),
803 filter_duration
804 );
805
806 plan.set_rows_out(visible_rows.len());
807 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
808 plan.add_detail(format!(
809 "Filter time: {:.3}ms",
810 filter_duration.as_secs_f64() * 1000.0
811 ));
812 plan.end_step();
813 }
814
815 let mut view = DataView::new(table.clone());
817 view = view.with_rows(visible_rows);
818
819 if let Some(group_by_exprs) = &statement.group_by {
821 if !group_by_exprs.is_empty() {
822 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
823
824 plan.begin_step(
825 StepType::GroupBy,
826 format!("GROUP BY {} expressions", group_by_exprs.len()),
827 );
828 plan.set_rows_in(view.row_count());
829 plan.add_detail(format!("Input: {} rows", view.row_count()));
830 for expr in group_by_exprs {
831 plan.add_detail(format!("Group by: {:?}", expr));
832 }
833
834 let group_start = Instant::now();
835 view = self.apply_group_by(
836 view,
837 group_by_exprs,
838 &statement.select_items,
839 statement.having.as_ref(),
840 )?;
841
842 plan.set_rows_out(view.row_count());
843 plan.add_detail(format!("Output: {} groups", view.row_count()));
844 plan.add_detail(format!(
845 "Group time: {:.3}ms",
846 group_start.elapsed().as_secs_f64() * 1000.0
847 ));
848 plan.end_step();
849 }
850 } else {
851 if !statement.select_items.is_empty() {
853 let has_non_star_items = statement
855 .select_items
856 .iter()
857 .any(|item| !matches!(item, SelectItem::Star));
858
859 if has_non_star_items || statement.select_items.len() > 1 {
863 view = self.apply_select_items(view, &statement.select_items)?;
864 } else {
865 }
866 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
868 debug!("QueryEngine: Using legacy columns path");
869 let source_table = view.source();
872 let column_indices =
873 self.resolve_column_indices(source_table, &statement.columns)?;
874 view = view.with_columns(column_indices);
875 }
876 }
877
878 if statement.distinct {
880 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
881 plan.set_rows_in(view.row_count());
882 plan.add_detail(format!("Input: {} rows", view.row_count()));
883
884 let distinct_start = Instant::now();
885 view = self.apply_distinct(view)?;
886
887 plan.set_rows_out(view.row_count());
888 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
889 plan.add_detail(format!(
890 "Distinct time: {:.3}ms",
891 distinct_start.elapsed().as_secs_f64() * 1000.0
892 ));
893 plan.end_step();
894 }
895
896 if let Some(order_by_columns) = &statement.order_by {
898 if !order_by_columns.is_empty() {
899 plan.begin_step(
900 StepType::Sort,
901 format!("ORDER BY {} columns", order_by_columns.len()),
902 );
903 plan.set_rows_in(view.row_count());
904 for col in order_by_columns {
905 plan.add_detail(format!("{} {:?}", col.column, col.direction));
906 }
907
908 let sort_start = Instant::now();
909 view = self.apply_multi_order_by(view, order_by_columns)?;
910
911 plan.add_detail(format!(
912 "Sort time: {:.3}ms",
913 sort_start.elapsed().as_secs_f64() * 1000.0
914 ));
915 plan.end_step();
916 }
917 }
918
919 if let Some(limit) = statement.limit {
921 let offset = statement.offset.unwrap_or(0);
922 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
923 plan.set_rows_in(view.row_count());
924 if offset > 0 {
925 plan.add_detail(format!("OFFSET: {}", offset));
926 }
927 view = view.with_limit(limit, offset);
928 plan.set_rows_out(view.row_count());
929 plan.add_detail(format!("Output: {} rows", view.row_count()));
930 plan.end_step();
931 }
932
933 Ok(view)
934 }
935
936 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
938 let mut indices = Vec::new();
939 let table_columns = table.column_names();
940
941 for col_name in columns {
942 let index = table_columns
943 .iter()
944 .position(|c| c.eq_ignore_ascii_case(col_name))
945 .ok_or_else(|| {
946 let suggestion = self.find_similar_column(table, col_name);
947 match suggestion {
948 Some(similar) => anyhow::anyhow!(
949 "Column '{}' not found. Did you mean '{}'?",
950 col_name,
951 similar
952 ),
953 None => anyhow::anyhow!("Column '{}' not found", col_name),
954 }
955 })?;
956 indices.push(index);
957 }
958
959 Ok(indices)
960 }
961
962 fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
964 debug!(
965 "QueryEngine::apply_select_items - items: {:?}",
966 select_items
967 );
968 debug!(
969 "QueryEngine::apply_select_items - input view has {} rows",
970 view.row_count()
971 );
972
973 let has_aggregates = select_items.iter().any(|item| match item {
977 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
978 SelectItem::Column(_) => false,
979 SelectItem::Star => false,
980 });
981
982 let all_aggregate_compatible = select_items.iter().all(|item| match item {
983 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
984 SelectItem::Column(_) => false, SelectItem::Star => false, });
987
988 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
989 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
992 return self.apply_aggregate_select(view, select_items);
993 }
994
995 let has_computed_expressions = select_items
997 .iter()
998 .any(|item| matches!(item, SelectItem::Expression { .. }));
999
1000 debug!(
1001 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1002 has_computed_expressions
1003 );
1004
1005 if !has_computed_expressions {
1006 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1008 return Ok(view.with_columns(column_indices));
1009 }
1010
1011 let source_table = view.source();
1016 let visible_rows = view.visible_row_indices();
1017
1018 let mut computed_table = DataTable::new("query_result");
1021
1022 let mut expanded_items = Vec::new();
1024 for item in select_items {
1025 match item {
1026 SelectItem::Star => {
1027 for col_name in source_table.column_names() {
1029 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1030 col_name.to_string(),
1031 )));
1032 }
1033 }
1034 _ => expanded_items.push(item.clone()),
1035 }
1036 }
1037
1038 let mut column_name_counts: std::collections::HashMap<String, usize> =
1040 std::collections::HashMap::new();
1041
1042 for item in &expanded_items {
1043 let base_name = match item {
1044 SelectItem::Column(col_ref) => col_ref.name.clone(),
1045 SelectItem::Expression { alias, .. } => alias.clone(),
1046 SelectItem::Star => unreachable!("Star should have been expanded"),
1047 };
1048
1049 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1051 let column_name = if *count == 0 {
1052 base_name.clone()
1054 } else {
1055 format!("{base_name}_{count}")
1057 };
1058 *count += 1;
1059
1060 computed_table.add_column(DataColumn::new(&column_name));
1061 }
1062
1063 let mut evaluator =
1065 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1066
1067 for &row_idx in visible_rows {
1068 let mut row_values = Vec::new();
1069
1070 for item in &expanded_items {
1071 let value = match item {
1072 SelectItem::Column(col_ref) => {
1073 let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1075 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1077 let result = source_table.find_column_by_qualified_name(&qualified_name);
1078
1079 if result.is_none() {
1081 let available_qualified: Vec<String> = source_table.columns.iter()
1082 .filter_map(|c| c.qualified_name.clone())
1083 .collect();
1084 let available_simple: Vec<String> = source_table.columns.iter()
1085 .map(|c| c.name.clone())
1086 .collect();
1087 debug!(
1088 "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1089 qualified_name, available_qualified, available_simple
1090 );
1091 }
1093 result
1094 } else {
1095 source_table.get_column_index(&col_ref.name)
1097 }
1098 .ok_or_else(|| {
1099 let display_name = if let Some(prefix) = &col_ref.table_prefix {
1100 format!("{}.{}", prefix, col_ref.name)
1101 } else {
1102 col_ref.name.clone()
1103 };
1104 let suggestion = self.find_similar_column(source_table, &col_ref.name);
1105 match suggestion {
1106 Some(similar) => anyhow::anyhow!(
1107 "Column '{}' not found. Did you mean '{}'?",
1108 display_name,
1109 similar
1110 ),
1111 None => anyhow::anyhow!("Column '{}' not found", display_name),
1112 }
1113 })?;
1114 let row = source_table
1115 .get_row(row_idx)
1116 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1117 row.get(col_idx)
1118 .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1119 .clone()
1120 }
1121 SelectItem::Expression { expr, .. } => {
1122 evaluator.evaluate(expr, row_idx)?
1124 }
1125 SelectItem::Star => unreachable!("Star should have been expanded"),
1126 };
1127 row_values.push(value);
1128 }
1129
1130 computed_table
1131 .add_row(DataRow::new(row_values))
1132 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1133 }
1134
1135 Ok(DataView::new(Arc::new(computed_table)))
1138 }
1139
1140 fn apply_aggregate_select(
1142 &self,
1143 view: DataView,
1144 select_items: &[SelectItem],
1145 ) -> Result<DataView> {
1146 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1147
1148 let source_table = view.source();
1149 let mut result_table = DataTable::new("aggregate_result");
1150
1151 for item in select_items {
1153 let column_name = match item {
1154 SelectItem::Expression { alias, .. } => alias.clone(),
1155 _ => unreachable!("Should only have expressions in aggregate-only query"),
1156 };
1157 result_table.add_column(DataColumn::new(&column_name));
1158 }
1159
1160 let visible_rows = view.visible_row_indices().to_vec();
1162 let mut evaluator =
1163 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1164 .with_visible_rows(visible_rows);
1165
1166 let mut row_values = Vec::new();
1168 for item in select_items {
1169 match item {
1170 SelectItem::Expression { expr, .. } => {
1171 let value = evaluator.evaluate(expr, 0)?;
1174 row_values.push(value);
1175 }
1176 _ => unreachable!("Should only have expressions in aggregate-only query"),
1177 }
1178 }
1179
1180 result_table
1182 .add_row(DataRow::new(row_values))
1183 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1184
1185 Ok(DataView::new(Arc::new(result_table)))
1186 }
1187
1188 fn resolve_select_columns(
1190 &self,
1191 table: &DataTable,
1192 select_items: &[SelectItem],
1193 ) -> Result<Vec<usize>> {
1194 let mut indices = Vec::new();
1195 let table_columns = table.column_names();
1196
1197 for item in select_items {
1198 match item {
1199 SelectItem::Column(col_ref) => {
1200 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1202 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1204 table.find_column_by_qualified_name(&qualified_name)
1205 .ok_or_else(|| {
1206 let has_qualified = table.columns.iter()
1208 .any(|c| c.qualified_name.is_some());
1209 if !has_qualified {
1210 anyhow::anyhow!(
1211 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1212 qualified_name, table_prefix
1213 )
1214 } else {
1215 anyhow::anyhow!("Column '{}' not found", qualified_name)
1216 }
1217 })?
1218 } else {
1219 table_columns
1221 .iter()
1222 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1223 .ok_or_else(|| {
1224 let suggestion = self.find_similar_column(table, &col_ref.name);
1225 match suggestion {
1226 Some(similar) => anyhow::anyhow!(
1227 "Column '{}' not found. Did you mean '{}'?",
1228 col_ref.name,
1229 similar
1230 ),
1231 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1232 }
1233 })?
1234 };
1235 indices.push(index);
1236 }
1237 SelectItem::Star => {
1238 for i in 0..table_columns.len() {
1240 indices.push(i);
1241 }
1242 }
1243 SelectItem::Expression { .. } => {
1244 return Err(anyhow::anyhow!(
1245 "Computed expressions require new table creation"
1246 ));
1247 }
1248 }
1249 }
1250
1251 Ok(indices)
1252 }
1253
1254 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1256 use std::collections::HashSet;
1257
1258 let source = view.source();
1259 let visible_cols = view.visible_column_indices();
1260 let visible_rows = view.visible_row_indices();
1261
1262 let mut seen_rows = HashSet::new();
1264 let mut unique_row_indices = Vec::new();
1265
1266 for &row_idx in visible_rows {
1267 let mut row_key = Vec::new();
1269 for &col_idx in visible_cols {
1270 let value = source
1271 .get_value(row_idx, col_idx)
1272 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1273 row_key.push(format!("{:?}", value));
1275 }
1276
1277 if seen_rows.insert(row_key) {
1279 unique_row_indices.push(row_idx);
1281 }
1282 }
1283
1284 Ok(view.with_rows(unique_row_indices))
1286 }
1287
1288 fn apply_multi_order_by(
1290 &self,
1291 mut view: DataView,
1292 order_by_columns: &[OrderByColumn],
1293 ) -> Result<DataView> {
1294 let mut sort_columns = Vec::new();
1296
1297 for order_col in order_by_columns {
1298 let col_index = view
1302 .source()
1303 .get_column_index(&order_col.column)
1304 .ok_or_else(|| {
1305 let suggestion = self.find_similar_column(view.source(), &order_col.column);
1307 match suggestion {
1308 Some(similar) => anyhow::anyhow!(
1309 "Column '{}' not found. Did you mean '{}'?",
1310 order_col.column,
1311 similar
1312 ),
1313 None => {
1314 let available_cols = view.source().column_names().join(", ");
1316 anyhow::anyhow!(
1317 "Column '{}' not found. Available columns: {}",
1318 order_col.column,
1319 available_cols
1320 )
1321 }
1322 }
1323 })?;
1324
1325 let ascending = matches!(order_col.direction, SortDirection::Asc);
1326 sort_columns.push((col_index, ascending));
1327 }
1328
1329 view.apply_multi_sort(&sort_columns)?;
1331 Ok(view)
1332 }
1333
1334 fn apply_group_by(
1336 &self,
1337 view: DataView,
1338 group_by_exprs: &[SqlExpression],
1339 select_items: &[SelectItem],
1340 having: Option<&SqlExpression>,
1341 ) -> Result<DataView> {
1342 self.apply_group_by_expressions(
1344 view,
1345 group_by_exprs,
1346 select_items,
1347 having,
1348 self.case_insensitive,
1349 self.date_notation.clone(),
1350 )
1351 }
1352
1353 pub fn estimate_group_cardinality(
1356 &self,
1357 view: &DataView,
1358 group_by_exprs: &[SqlExpression],
1359 ) -> usize {
1360 let row_count = view.get_visible_rows().len();
1362 if row_count <= 100 {
1363 return row_count;
1364 }
1365
1366 let sample_size = min(1000, row_count / 10).max(100);
1368 let mut seen = FxHashSet::default();
1369
1370 let visible_rows = view.get_visible_rows();
1371 for (i, &row_idx) in visible_rows.iter().enumerate() {
1372 if i >= sample_size {
1373 break;
1374 }
1375
1376 let mut key_values = Vec::new();
1378 for expr in group_by_exprs {
1379 let mut evaluator = ArithmeticEvaluator::new(view.source());
1380 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1381 key_values.push(value);
1382 }
1383
1384 seen.insert(key_values);
1385 }
1386
1387 let sample_cardinality = seen.len();
1389 let estimated = (sample_cardinality * row_count) / sample_size;
1390
1391 estimated.min(row_count).max(sample_cardinality)
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use super::*;
1399 use crate::data::datatable::{DataColumn, DataRow, DataValue};
1400
1401 fn create_test_table() -> Arc<DataTable> {
1402 let mut table = DataTable::new("test");
1403
1404 table.add_column(DataColumn::new("id"));
1406 table.add_column(DataColumn::new("name"));
1407 table.add_column(DataColumn::new("age"));
1408
1409 table
1411 .add_row(DataRow::new(vec![
1412 DataValue::Integer(1),
1413 DataValue::String("Alice".to_string()),
1414 DataValue::Integer(30),
1415 ]))
1416 .unwrap();
1417
1418 table
1419 .add_row(DataRow::new(vec![
1420 DataValue::Integer(2),
1421 DataValue::String("Bob".to_string()),
1422 DataValue::Integer(25),
1423 ]))
1424 .unwrap();
1425
1426 table
1427 .add_row(DataRow::new(vec![
1428 DataValue::Integer(3),
1429 DataValue::String("Charlie".to_string()),
1430 DataValue::Integer(35),
1431 ]))
1432 .unwrap();
1433
1434 Arc::new(table)
1435 }
1436
1437 #[test]
1438 fn test_select_all() {
1439 let table = create_test_table();
1440 let engine = QueryEngine::new();
1441
1442 let view = engine
1443 .execute(table.clone(), "SELECT * FROM users")
1444 .unwrap();
1445 assert_eq!(view.row_count(), 3);
1446 assert_eq!(view.column_count(), 3);
1447 }
1448
1449 #[test]
1450 fn test_select_columns() {
1451 let table = create_test_table();
1452 let engine = QueryEngine::new();
1453
1454 let view = engine
1455 .execute(table.clone(), "SELECT name, age FROM users")
1456 .unwrap();
1457 assert_eq!(view.row_count(), 3);
1458 assert_eq!(view.column_count(), 2);
1459 }
1460
1461 #[test]
1462 fn test_select_with_limit() {
1463 let table = create_test_table();
1464 let engine = QueryEngine::new();
1465
1466 let view = engine
1467 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1468 .unwrap();
1469 assert_eq!(view.row_count(), 2);
1470 }
1471
1472 #[test]
1473 fn test_type_coercion_contains() {
1474 let _ = tracing_subscriber::fmt()
1476 .with_max_level(tracing::Level::DEBUG)
1477 .try_init();
1478
1479 let mut table = DataTable::new("test");
1480 table.add_column(DataColumn::new("id"));
1481 table.add_column(DataColumn::new("status"));
1482 table.add_column(DataColumn::new("price"));
1483
1484 table
1486 .add_row(DataRow::new(vec![
1487 DataValue::Integer(1),
1488 DataValue::String("Pending".to_string()),
1489 DataValue::Float(99.99),
1490 ]))
1491 .unwrap();
1492
1493 table
1494 .add_row(DataRow::new(vec![
1495 DataValue::Integer(2),
1496 DataValue::String("Confirmed".to_string()),
1497 DataValue::Float(150.50),
1498 ]))
1499 .unwrap();
1500
1501 table
1502 .add_row(DataRow::new(vec![
1503 DataValue::Integer(3),
1504 DataValue::String("Pending".to_string()),
1505 DataValue::Float(75.00),
1506 ]))
1507 .unwrap();
1508
1509 let table = Arc::new(table);
1510 let engine = QueryEngine::new();
1511
1512 println!("\n=== Testing WHERE clause with Contains ===");
1513 println!("Table has {} rows", table.row_count());
1514 for i in 0..table.row_count() {
1515 let status = table.get_value(i, 1);
1516 println!("Row {i}: status = {status:?}");
1517 }
1518
1519 println!("\n--- Test 1: status.Contains('pend') ---");
1521 let result = engine.execute(
1522 table.clone(),
1523 "SELECT * FROM test WHERE status.Contains('pend')",
1524 );
1525 match result {
1526 Ok(view) => {
1527 println!("SUCCESS: Found {} matching rows", view.row_count());
1528 assert_eq!(view.row_count(), 2); }
1530 Err(e) => {
1531 panic!("Query failed: {e}");
1532 }
1533 }
1534
1535 println!("\n--- Test 2: price.Contains('9') ---");
1537 let result = engine.execute(
1538 table.clone(),
1539 "SELECT * FROM test WHERE price.Contains('9')",
1540 );
1541 match result {
1542 Ok(view) => {
1543 println!(
1544 "SUCCESS: Found {} matching rows with price containing '9'",
1545 view.row_count()
1546 );
1547 assert!(view.row_count() >= 1);
1549 }
1550 Err(e) => {
1551 panic!("Numeric coercion query failed: {e}");
1552 }
1553 }
1554
1555 println!("\n=== All tests passed! ===");
1556 }
1557
1558 #[test]
1559 fn test_not_in_clause() {
1560 let _ = tracing_subscriber::fmt()
1562 .with_max_level(tracing::Level::DEBUG)
1563 .try_init();
1564
1565 let mut table = DataTable::new("test");
1566 table.add_column(DataColumn::new("id"));
1567 table.add_column(DataColumn::new("country"));
1568
1569 table
1571 .add_row(DataRow::new(vec![
1572 DataValue::Integer(1),
1573 DataValue::String("CA".to_string()),
1574 ]))
1575 .unwrap();
1576
1577 table
1578 .add_row(DataRow::new(vec![
1579 DataValue::Integer(2),
1580 DataValue::String("US".to_string()),
1581 ]))
1582 .unwrap();
1583
1584 table
1585 .add_row(DataRow::new(vec![
1586 DataValue::Integer(3),
1587 DataValue::String("UK".to_string()),
1588 ]))
1589 .unwrap();
1590
1591 let table = Arc::new(table);
1592 let engine = QueryEngine::new();
1593
1594 println!("\n=== Testing NOT IN clause ===");
1595 println!("Table has {} rows", table.row_count());
1596 for i in 0..table.row_count() {
1597 let country = table.get_value(i, 1);
1598 println!("Row {i}: country = {country:?}");
1599 }
1600
1601 println!("\n--- Test: country NOT IN ('CA') ---");
1603 let result = engine.execute(
1604 table.clone(),
1605 "SELECT * FROM test WHERE country NOT IN ('CA')",
1606 );
1607 match result {
1608 Ok(view) => {
1609 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1610 assert_eq!(view.row_count(), 2); }
1612 Err(e) => {
1613 panic!("NOT IN query failed: {e}");
1614 }
1615 }
1616
1617 println!("\n=== NOT IN test complete! ===");
1618 }
1619
1620 #[test]
1621 fn test_case_insensitive_in_and_not_in() {
1622 let _ = tracing_subscriber::fmt()
1624 .with_max_level(tracing::Level::DEBUG)
1625 .try_init();
1626
1627 let mut table = DataTable::new("test");
1628 table.add_column(DataColumn::new("id"));
1629 table.add_column(DataColumn::new("country"));
1630
1631 table
1633 .add_row(DataRow::new(vec![
1634 DataValue::Integer(1),
1635 DataValue::String("CA".to_string()), ]))
1637 .unwrap();
1638
1639 table
1640 .add_row(DataRow::new(vec![
1641 DataValue::Integer(2),
1642 DataValue::String("us".to_string()), ]))
1644 .unwrap();
1645
1646 table
1647 .add_row(DataRow::new(vec![
1648 DataValue::Integer(3),
1649 DataValue::String("UK".to_string()), ]))
1651 .unwrap();
1652
1653 let table = Arc::new(table);
1654
1655 println!("\n=== Testing Case-Insensitive IN clause ===");
1656 println!("Table has {} rows", table.row_count());
1657 for i in 0..table.row_count() {
1658 let country = table.get_value(i, 1);
1659 println!("Row {i}: country = {country:?}");
1660 }
1661
1662 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1664 let engine = QueryEngine::with_case_insensitive(true);
1665 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1666 match result {
1667 Ok(view) => {
1668 println!(
1669 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1670 view.row_count()
1671 );
1672 assert_eq!(view.row_count(), 1); }
1674 Err(e) => {
1675 panic!("Case-insensitive IN query failed: {e}");
1676 }
1677 }
1678
1679 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1681 let result = engine.execute(
1682 table.clone(),
1683 "SELECT * FROM test WHERE country NOT IN ('ca')",
1684 );
1685 match result {
1686 Ok(view) => {
1687 println!(
1688 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1689 view.row_count()
1690 );
1691 assert_eq!(view.row_count(), 2); }
1693 Err(e) => {
1694 panic!("Case-insensitive NOT IN query failed: {e}");
1695 }
1696 }
1697
1698 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1700 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
1702 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1703 match result {
1704 Ok(view) => {
1705 println!(
1706 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1707 view.row_count()
1708 );
1709 assert_eq!(view.row_count(), 0); }
1711 Err(e) => {
1712 panic!("Case-sensitive IN query failed: {e}");
1713 }
1714 }
1715
1716 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1717 }
1718
1719 #[test]
1720 #[ignore = "Parentheses in WHERE clause not yet implemented"]
1721 fn test_parentheses_in_where_clause() {
1722 let _ = tracing_subscriber::fmt()
1724 .with_max_level(tracing::Level::DEBUG)
1725 .try_init();
1726
1727 let mut table = DataTable::new("test");
1728 table.add_column(DataColumn::new("id"));
1729 table.add_column(DataColumn::new("status"));
1730 table.add_column(DataColumn::new("priority"));
1731
1732 table
1734 .add_row(DataRow::new(vec![
1735 DataValue::Integer(1),
1736 DataValue::String("Pending".to_string()),
1737 DataValue::String("High".to_string()),
1738 ]))
1739 .unwrap();
1740
1741 table
1742 .add_row(DataRow::new(vec![
1743 DataValue::Integer(2),
1744 DataValue::String("Complete".to_string()),
1745 DataValue::String("High".to_string()),
1746 ]))
1747 .unwrap();
1748
1749 table
1750 .add_row(DataRow::new(vec![
1751 DataValue::Integer(3),
1752 DataValue::String("Pending".to_string()),
1753 DataValue::String("Low".to_string()),
1754 ]))
1755 .unwrap();
1756
1757 table
1758 .add_row(DataRow::new(vec![
1759 DataValue::Integer(4),
1760 DataValue::String("Complete".to_string()),
1761 DataValue::String("Low".to_string()),
1762 ]))
1763 .unwrap();
1764
1765 let table = Arc::new(table);
1766 let engine = QueryEngine::new();
1767
1768 println!("\n=== Testing Parentheses in WHERE clause ===");
1769 println!("Table has {} rows", table.row_count());
1770 for i in 0..table.row_count() {
1771 let status = table.get_value(i, 1);
1772 let priority = table.get_value(i, 2);
1773 println!("Row {i}: status = {status:?}, priority = {priority:?}");
1774 }
1775
1776 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1778 let result = engine.execute(
1779 table.clone(),
1780 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1781 );
1782 match result {
1783 Ok(view) => {
1784 println!(
1785 "SUCCESS: Found {} rows with parenthetical logic",
1786 view.row_count()
1787 );
1788 assert_eq!(view.row_count(), 2); }
1790 Err(e) => {
1791 panic!("Parentheses query failed: {e}");
1792 }
1793 }
1794
1795 println!("\n=== Parentheses test complete! ===");
1796 }
1797
1798 #[test]
1799 #[ignore = "Numeric type coercion needs fixing"]
1800 fn test_numeric_type_coercion() {
1801 let _ = tracing_subscriber::fmt()
1803 .with_max_level(tracing::Level::DEBUG)
1804 .try_init();
1805
1806 let mut table = DataTable::new("test");
1807 table.add_column(DataColumn::new("id"));
1808 table.add_column(DataColumn::new("price"));
1809 table.add_column(DataColumn::new("quantity"));
1810
1811 table
1813 .add_row(DataRow::new(vec![
1814 DataValue::Integer(1),
1815 DataValue::Float(99.50), DataValue::Integer(100),
1817 ]))
1818 .unwrap();
1819
1820 table
1821 .add_row(DataRow::new(vec![
1822 DataValue::Integer(2),
1823 DataValue::Float(150.0), DataValue::Integer(200),
1825 ]))
1826 .unwrap();
1827
1828 table
1829 .add_row(DataRow::new(vec![
1830 DataValue::Integer(3),
1831 DataValue::Integer(75), DataValue::Integer(50),
1833 ]))
1834 .unwrap();
1835
1836 let table = Arc::new(table);
1837 let engine = QueryEngine::new();
1838
1839 println!("\n=== Testing Numeric Type Coercion ===");
1840 println!("Table has {} rows", table.row_count());
1841 for i in 0..table.row_count() {
1842 let price = table.get_value(i, 1);
1843 let quantity = table.get_value(i, 2);
1844 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1845 }
1846
1847 println!("\n--- Test: price.Contains('.') ---");
1849 let result = engine.execute(
1850 table.clone(),
1851 "SELECT * FROM test WHERE price.Contains('.')",
1852 );
1853 match result {
1854 Ok(view) => {
1855 println!(
1856 "SUCCESS: Found {} rows with decimal points in price",
1857 view.row_count()
1858 );
1859 assert_eq!(view.row_count(), 2); }
1861 Err(e) => {
1862 panic!("Numeric Contains query failed: {e}");
1863 }
1864 }
1865
1866 println!("\n--- Test: quantity.Contains('0') ---");
1868 let result = engine.execute(
1869 table.clone(),
1870 "SELECT * FROM test WHERE quantity.Contains('0')",
1871 );
1872 match result {
1873 Ok(view) => {
1874 println!(
1875 "SUCCESS: Found {} rows with '0' in quantity",
1876 view.row_count()
1877 );
1878 assert_eq!(view.row_count(), 2); }
1880 Err(e) => {
1881 panic!("Integer Contains query failed: {e}");
1882 }
1883 }
1884
1885 println!("\n=== Numeric type coercion test complete! ===");
1886 }
1887
1888 #[test]
1889 fn test_datetime_comparisons() {
1890 let _ = tracing_subscriber::fmt()
1892 .with_max_level(tracing::Level::DEBUG)
1893 .try_init();
1894
1895 let mut table = DataTable::new("test");
1896 table.add_column(DataColumn::new("id"));
1897 table.add_column(DataColumn::new("created_date"));
1898
1899 table
1901 .add_row(DataRow::new(vec![
1902 DataValue::Integer(1),
1903 DataValue::String("2024-12-15".to_string()),
1904 ]))
1905 .unwrap();
1906
1907 table
1908 .add_row(DataRow::new(vec![
1909 DataValue::Integer(2),
1910 DataValue::String("2025-01-15".to_string()),
1911 ]))
1912 .unwrap();
1913
1914 table
1915 .add_row(DataRow::new(vec![
1916 DataValue::Integer(3),
1917 DataValue::String("2025-02-15".to_string()),
1918 ]))
1919 .unwrap();
1920
1921 let table = Arc::new(table);
1922 let engine = QueryEngine::new();
1923
1924 println!("\n=== Testing DateTime Comparisons ===");
1925 println!("Table has {} rows", table.row_count());
1926 for i in 0..table.row_count() {
1927 let date = table.get_value(i, 1);
1928 println!("Row {i}: created_date = {date:?}");
1929 }
1930
1931 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
1933 let result = engine.execute(
1934 table.clone(),
1935 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
1936 );
1937 match result {
1938 Ok(view) => {
1939 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
1940 assert_eq!(view.row_count(), 2); }
1942 Err(e) => {
1943 panic!("DateTime comparison query failed: {e}");
1944 }
1945 }
1946
1947 println!("\n=== DateTime comparison test complete! ===");
1948 }
1949
1950 #[test]
1951 fn test_not_with_method_calls() {
1952 let _ = tracing_subscriber::fmt()
1954 .with_max_level(tracing::Level::DEBUG)
1955 .try_init();
1956
1957 let mut table = DataTable::new("test");
1958 table.add_column(DataColumn::new("id"));
1959 table.add_column(DataColumn::new("status"));
1960
1961 table
1963 .add_row(DataRow::new(vec![
1964 DataValue::Integer(1),
1965 DataValue::String("Pending Review".to_string()),
1966 ]))
1967 .unwrap();
1968
1969 table
1970 .add_row(DataRow::new(vec![
1971 DataValue::Integer(2),
1972 DataValue::String("Complete".to_string()),
1973 ]))
1974 .unwrap();
1975
1976 table
1977 .add_row(DataRow::new(vec![
1978 DataValue::Integer(3),
1979 DataValue::String("Pending Approval".to_string()),
1980 ]))
1981 .unwrap();
1982
1983 let table = Arc::new(table);
1984 let engine = QueryEngine::with_case_insensitive(true);
1985
1986 println!("\n=== Testing NOT with Method Calls ===");
1987 println!("Table has {} rows", table.row_count());
1988 for i in 0..table.row_count() {
1989 let status = table.get_value(i, 1);
1990 println!("Row {i}: status = {status:?}");
1991 }
1992
1993 println!("\n--- Test: NOT status.Contains('pend') ---");
1995 let result = engine.execute(
1996 table.clone(),
1997 "SELECT * FROM test WHERE NOT status.Contains('pend')",
1998 );
1999 match result {
2000 Ok(view) => {
2001 println!(
2002 "SUCCESS: Found {} rows NOT containing 'pend'",
2003 view.row_count()
2004 );
2005 assert_eq!(view.row_count(), 1); }
2007 Err(e) => {
2008 panic!("NOT Contains query failed: {e}");
2009 }
2010 }
2011
2012 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2014 let result = engine.execute(
2015 table.clone(),
2016 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2017 );
2018 match result {
2019 Ok(view) => {
2020 println!(
2021 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2022 view.row_count()
2023 );
2024 assert_eq!(view.row_count(), 1); }
2026 Err(e) => {
2027 panic!("NOT StartsWith query failed: {e}");
2028 }
2029 }
2030
2031 println!("\n=== NOT with method calls test complete! ===");
2032 }
2033
2034 #[test]
2035 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2036 fn test_complex_logical_expressions() {
2037 let _ = tracing_subscriber::fmt()
2039 .with_max_level(tracing::Level::DEBUG)
2040 .try_init();
2041
2042 let mut table = DataTable::new("test");
2043 table.add_column(DataColumn::new("id"));
2044 table.add_column(DataColumn::new("status"));
2045 table.add_column(DataColumn::new("priority"));
2046 table.add_column(DataColumn::new("assigned"));
2047
2048 table
2050 .add_row(DataRow::new(vec![
2051 DataValue::Integer(1),
2052 DataValue::String("Pending".to_string()),
2053 DataValue::String("High".to_string()),
2054 DataValue::String("John".to_string()),
2055 ]))
2056 .unwrap();
2057
2058 table
2059 .add_row(DataRow::new(vec![
2060 DataValue::Integer(2),
2061 DataValue::String("Complete".to_string()),
2062 DataValue::String("High".to_string()),
2063 DataValue::String("Jane".to_string()),
2064 ]))
2065 .unwrap();
2066
2067 table
2068 .add_row(DataRow::new(vec![
2069 DataValue::Integer(3),
2070 DataValue::String("Pending".to_string()),
2071 DataValue::String("Low".to_string()),
2072 DataValue::String("John".to_string()),
2073 ]))
2074 .unwrap();
2075
2076 table
2077 .add_row(DataRow::new(vec![
2078 DataValue::Integer(4),
2079 DataValue::String("In Progress".to_string()),
2080 DataValue::String("Medium".to_string()),
2081 DataValue::String("Jane".to_string()),
2082 ]))
2083 .unwrap();
2084
2085 let table = Arc::new(table);
2086 let engine = QueryEngine::new();
2087
2088 println!("\n=== Testing Complex Logical Expressions ===");
2089 println!("Table has {} rows", table.row_count());
2090 for i in 0..table.row_count() {
2091 let status = table.get_value(i, 1);
2092 let priority = table.get_value(i, 2);
2093 let assigned = table.get_value(i, 3);
2094 println!(
2095 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2096 );
2097 }
2098
2099 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2101 let result = engine.execute(
2102 table.clone(),
2103 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2104 );
2105 match result {
2106 Ok(view) => {
2107 println!(
2108 "SUCCESS: Found {} rows with complex logic",
2109 view.row_count()
2110 );
2111 assert_eq!(view.row_count(), 2); }
2113 Err(e) => {
2114 panic!("Complex logic query failed: {e}");
2115 }
2116 }
2117
2118 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2120 let result = engine.execute(
2121 table.clone(),
2122 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2123 );
2124 match result {
2125 Ok(view) => {
2126 println!(
2127 "SUCCESS: Found {} rows with NOT complex logic",
2128 view.row_count()
2129 );
2130 assert_eq!(view.row_count(), 2); }
2132 Err(e) => {
2133 panic!("NOT complex logic query failed: {e}");
2134 }
2135 }
2136
2137 println!("\n=== Complex logical expressions test complete! ===");
2138 }
2139
2140 #[test]
2141 fn test_mixed_data_types_and_edge_cases() {
2142 let _ = tracing_subscriber::fmt()
2144 .with_max_level(tracing::Level::DEBUG)
2145 .try_init();
2146
2147 let mut table = DataTable::new("test");
2148 table.add_column(DataColumn::new("id"));
2149 table.add_column(DataColumn::new("value"));
2150 table.add_column(DataColumn::new("nullable_field"));
2151
2152 table
2154 .add_row(DataRow::new(vec![
2155 DataValue::Integer(1),
2156 DataValue::String("123.45".to_string()),
2157 DataValue::String("present".to_string()),
2158 ]))
2159 .unwrap();
2160
2161 table
2162 .add_row(DataRow::new(vec![
2163 DataValue::Integer(2),
2164 DataValue::Float(678.90),
2165 DataValue::Null,
2166 ]))
2167 .unwrap();
2168
2169 table
2170 .add_row(DataRow::new(vec![
2171 DataValue::Integer(3),
2172 DataValue::Boolean(true),
2173 DataValue::String("also present".to_string()),
2174 ]))
2175 .unwrap();
2176
2177 table
2178 .add_row(DataRow::new(vec![
2179 DataValue::Integer(4),
2180 DataValue::String("false".to_string()),
2181 DataValue::Null,
2182 ]))
2183 .unwrap();
2184
2185 let table = Arc::new(table);
2186 let engine = QueryEngine::new();
2187
2188 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2189 println!("Table has {} rows", table.row_count());
2190 for i in 0..table.row_count() {
2191 let value = table.get_value(i, 1);
2192 let nullable = table.get_value(i, 2);
2193 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2194 }
2195
2196 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2198 let result = engine.execute(
2199 table.clone(),
2200 "SELECT * FROM test WHERE value.Contains('true')",
2201 );
2202 match result {
2203 Ok(view) => {
2204 println!(
2205 "SUCCESS: Found {} rows with boolean coercion",
2206 view.row_count()
2207 );
2208 assert_eq!(view.row_count(), 1); }
2210 Err(e) => {
2211 panic!("Boolean coercion query failed: {e}");
2212 }
2213 }
2214
2215 println!("\n--- Test: id IN (1, 3) ---");
2217 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2218 match result {
2219 Ok(view) => {
2220 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2221 assert_eq!(view.row_count(), 2); }
2223 Err(e) => {
2224 panic!("Multiple IN values query failed: {e}");
2225 }
2226 }
2227
2228 println!("\n=== Mixed data types test complete! ===");
2229 }
2230
2231 #[test]
2233 fn test_aggregate_only_single_row() {
2234 let table = create_test_stock_data();
2235 let engine = QueryEngine::new();
2236
2237 let result = engine
2239 .execute(
2240 table.clone(),
2241 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2242 )
2243 .expect("Query should succeed");
2244
2245 assert_eq!(
2246 result.row_count(),
2247 1,
2248 "Aggregate-only query should return exactly 1 row"
2249 );
2250 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2251
2252 let source = result.source();
2254 let row = source.get_row(0).expect("Should have first row");
2255
2256 assert_eq!(row.values[0], DataValue::Integer(5));
2258
2259 assert_eq!(row.values[1], DataValue::Float(99.5));
2261
2262 assert_eq!(row.values[2], DataValue::Float(105.0));
2264
2265 if let DataValue::Float(avg) = &row.values[3] {
2267 assert!(
2268 (avg - 102.4).abs() < 0.01,
2269 "Average should be approximately 102.4, got {}",
2270 avg
2271 );
2272 } else {
2273 panic!("AVG should return a Float value");
2274 }
2275 }
2276
2277 #[test]
2279 fn test_single_aggregate_single_row() {
2280 let table = create_test_stock_data();
2281 let engine = QueryEngine::new();
2282
2283 let result = engine
2284 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2285 .expect("Query should succeed");
2286
2287 assert_eq!(
2288 result.row_count(),
2289 1,
2290 "Single aggregate query should return exactly 1 row"
2291 );
2292 assert_eq!(result.column_count(), 1, "Should have 1 column");
2293
2294 let source = result.source();
2295 let row = source.get_row(0).expect("Should have first row");
2296 assert_eq!(row.values[0], DataValue::Integer(5));
2297 }
2298
2299 #[test]
2301 fn test_aggregate_with_where_single_row() {
2302 let table = create_test_stock_data();
2303 let engine = QueryEngine::new();
2304
2305 let result = engine
2307 .execute(
2308 table.clone(),
2309 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2310 )
2311 .expect("Query should succeed");
2312
2313 assert_eq!(
2314 result.row_count(),
2315 1,
2316 "Filtered aggregate query should return exactly 1 row"
2317 );
2318 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2319
2320 let source = result.source();
2321 let row = source.get_row(0).expect("Should have first row");
2322
2323 assert_eq!(row.values[0], DataValue::Integer(2));
2325 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
2328
2329 #[test]
2330 fn test_not_in_parsing() {
2331 use crate::sql::recursive_parser::Parser;
2332
2333 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2334 println!("\n=== Testing NOT IN parsing ===");
2335 println!("Parsing query: {query}");
2336
2337 let mut parser = Parser::new(query);
2338 match parser.parse() {
2339 Ok(statement) => {
2340 println!("Parsed statement: {statement:#?}");
2341 if let Some(where_clause) = statement.where_clause {
2342 println!("WHERE conditions: {:#?}", where_clause.conditions);
2343 if let Some(first_condition) = where_clause.conditions.first() {
2344 println!("First condition expression: {:#?}", first_condition.expr);
2345 }
2346 }
2347 }
2348 Err(e) => {
2349 panic!("Parse error: {e}");
2350 }
2351 }
2352 }
2353
2354 fn create_test_stock_data() -> Arc<DataTable> {
2356 let mut table = DataTable::new("stock");
2357
2358 table.add_column(DataColumn::new("symbol"));
2359 table.add_column(DataColumn::new("close"));
2360 table.add_column(DataColumn::new("volume"));
2361
2362 let test_data = vec![
2364 ("AAPL", 99.5, 1000),
2365 ("AAPL", 101.2, 1500),
2366 ("AAPL", 103.5, 2000),
2367 ("AAPL", 105.0, 1200),
2368 ("AAPL", 102.8, 1800),
2369 ];
2370
2371 for (symbol, close, volume) in test_data {
2372 table
2373 .add_row(DataRow::new(vec![
2374 DataValue::String(symbol.to_string()),
2375 DataValue::Float(close),
2376 DataValue::Integer(volume),
2377 ]))
2378 .expect("Should add row successfully");
2379 }
2380
2381 Arc::new(table)
2382 }
2383}
2384
2385#[cfg(test)]
2386#[path = "query_engine_tests.rs"]
2387mod query_engine_tests;