1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
20use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
21use crate::sql::parser::ast::ColumnRef;
22use crate::sql::parser::ast::TableSource;
23use crate::sql::recursive_parser::{
24 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
25 TableFunction,
26};
27
28#[derive(Clone)]
30pub struct QueryEngine {
31 case_insensitive: bool,
32 date_notation: String,
33 _behavior_config: Option<BehaviorConfig>,
34}
35
36impl Default for QueryEngine {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl QueryEngine {
43 #[must_use]
44 pub fn new() -> Self {
45 Self {
46 case_insensitive: false,
47 date_notation: get_date_notation(),
48 _behavior_config: None,
49 }
50 }
51
52 #[must_use]
53 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
54 let case_insensitive = config.case_insensitive_default;
55 let date_notation = get_date_notation();
57 Self {
58 case_insensitive,
59 date_notation,
60 _behavior_config: Some(config),
61 }
62 }
63
64 #[must_use]
65 pub fn with_date_notation(_date_notation: String) -> Self {
66 Self {
67 case_insensitive: false,
68 date_notation: get_date_notation(), _behavior_config: None,
70 }
71 }
72
73 #[must_use]
74 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
75 Self {
76 case_insensitive,
77 date_notation: get_date_notation(),
78 _behavior_config: None,
79 }
80 }
81
82 #[must_use]
83 pub fn with_case_insensitive_and_date_notation(
84 case_insensitive: bool,
85 _date_notation: String, ) -> Self {
87 Self {
88 case_insensitive,
89 date_notation: get_date_notation(), _behavior_config: None,
91 }
92 }
93
94 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
96 let columns = table.column_names();
97 let mut best_match: Option<(String, usize)> = None;
98
99 for col in columns {
100 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
101 let max_distance = if name.len() > 10 { 3 } else { 2 };
104 if distance <= max_distance {
105 match &best_match {
106 None => best_match = Some((col, distance)),
107 Some((_, best_dist)) if distance < *best_dist => {
108 best_match = Some((col, distance));
109 }
110 _ => {}
111 }
112 }
113 }
114
115 best_match.map(|(name, _)| name)
116 }
117
118 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
120 let len1 = s1.len();
121 let len2 = s2.len();
122 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
123
124 for i in 0..=len1 {
125 matrix[i][0] = i;
126 }
127 for j in 0..=len2 {
128 matrix[0][j] = j;
129 }
130
131 for (i, c1) in s1.chars().enumerate() {
132 for (j, c2) in s2.chars().enumerate() {
133 let cost = usize::from(c1 != c2);
134 matrix[i + 1][j + 1] = std::cmp::min(
135 matrix[i][j + 1] + 1, std::cmp::min(
137 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
140 );
141 }
142 }
143
144 matrix[len1][len2]
145 }
146
147 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
149 let (view, _plan) = self.execute_with_plan(table, sql)?;
150 Ok(view)
151 }
152
153 pub fn execute_statement(
155 &self,
156 table: Arc<DataTable>,
157 statement: SelectStatement,
158 ) -> Result<DataView> {
159 let mut cte_context = HashMap::new();
161 for cte in &statement.ctes {
162 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
163 let cte_result = match &cte.cte_type {
165 CTEType::Standard(query) => {
166 let view = self.build_view_with_context(
168 table.clone(),
169 query.clone(),
170 &mut cte_context,
171 )?;
172
173 let mut materialized = self.materialize_view(view)?;
175
176 for column in materialized.columns_mut() {
178 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
179 column.source_table = Some(cte.name.clone());
180 }
181
182 DataView::new(Arc::new(materialized))
183 }
184 CTEType::Web(web_spec) => {
185 use crate::web::http_fetcher::WebDataFetcher;
187
188 let fetcher = WebDataFetcher::new()?;
189 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
191
192 for column in data_table.columns_mut() {
194 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195 column.source_table = Some(cte.name.clone());
196 }
197
198 DataView::new(Arc::new(data_table))
200 }
201 };
202 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204 debug!(
205 "QueryEngine: CTE '{}' pre-processed, stored in context",
206 cte.name
207 );
208 }
209
210 let mut subquery_executor =
212 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215 self.build_view_with_context(table, processed_statement, &mut cte_context)
217 }
218
219 pub fn execute_statement_with_cte_context(
221 &self,
222 table: Arc<DataTable>,
223 statement: SelectStatement,
224 cte_context: &HashMap<String, Arc<DataView>>,
225 ) -> Result<DataView> {
226 let mut local_context = cte_context.clone();
228
229 for cte in &statement.ctes {
231 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232 let cte_result = match &cte.cte_type {
233 CTEType::Standard(query) => {
234 let view = self.build_view_with_context(
235 table.clone(),
236 query.clone(),
237 &mut local_context,
238 )?;
239
240 let mut materialized = self.materialize_view(view)?;
242
243 for column in materialized.columns_mut() {
245 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246 column.source_table = Some(cte.name.clone());
247 }
248
249 DataView::new(Arc::new(materialized))
250 }
251 CTEType::Web(web_spec) => {
252 use crate::web::http_fetcher::WebDataFetcher;
254
255 let fetcher = WebDataFetcher::new()?;
256 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
258
259 for column in data_table.columns_mut() {
261 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
262 column.source_table = Some(cte.name.clone());
263 }
264
265 DataView::new(Arc::new(data_table))
267 }
268 };
269 local_context.insert(cte.name.clone(), Arc::new(cte_result));
270 }
271
272 let mut subquery_executor =
274 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
275 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
276
277 self.build_view_with_context(table, processed_statement, &mut local_context)
279 }
280
281 pub fn execute_with_plan(
283 &self,
284 table: Arc<DataTable>,
285 sql: &str,
286 ) -> Result<(DataView, ExecutionPlan)> {
287 let mut plan_builder = ExecutionPlanBuilder::new();
288 let start_time = Instant::now();
289
290 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
292 plan_builder.add_detail(format!("Query: {}", sql));
293 let mut parser = Parser::new(sql);
294 let statement = parser
295 .parse()
296 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
297 plan_builder.add_detail(format!("Parsed successfully"));
298 if let Some(from) = &statement.from_table {
299 plan_builder.add_detail(format!("FROM: {}", from));
300 }
301 if statement.where_clause.is_some() {
302 plan_builder.add_detail("WHERE clause present".to_string());
303 }
304 plan_builder.end_step();
305
306 let mut cte_context = HashMap::new();
308
309 if !statement.ctes.is_empty() {
310 plan_builder.begin_step(
311 StepType::CTE,
312 format!("Process {} CTEs", statement.ctes.len()),
313 );
314
315 for cte in &statement.ctes {
316 let cte_start = Instant::now();
317 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
318
319 let cte_result = match &cte.cte_type {
320 CTEType::Standard(query) => {
321 if let Some(from) = &query.from_table {
323 plan_builder.add_detail(format!("Source: {}", from));
324 }
325 if query.where_clause.is_some() {
326 plan_builder.add_detail("Has WHERE clause".to_string());
327 }
328 if query.group_by.is_some() {
329 plan_builder.add_detail("Has GROUP BY".to_string());
330 }
331
332 debug!(
333 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
334 cte.name,
335 cte_context.keys().collect::<Vec<_>>()
336 );
337
338 let mut subquery_executor = SubqueryExecutor::with_cte_context(
341 self.clone(),
342 table.clone(),
343 cte_context.clone(),
344 );
345 let processed_query = subquery_executor.execute_subqueries(query)?;
346
347 let view = self.build_view_with_context(
348 table.clone(),
349 processed_query,
350 &mut cte_context,
351 )?;
352
353 let mut materialized = self.materialize_view(view)?;
355
356 for column in materialized.columns_mut() {
358 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
359 column.source_table = Some(cte.name.clone());
360 }
361
362 DataView::new(Arc::new(materialized))
363 }
364 CTEType::Web(web_spec) => {
365 plan_builder.add_detail(format!("URL: {}", web_spec.url));
366 if let Some(format) = &web_spec.format {
367 plan_builder.add_detail(format!("Format: {:?}", format));
368 }
369 if let Some(cache) = web_spec.cache_seconds {
370 plan_builder.add_detail(format!("Cache: {} seconds", cache));
371 }
372
373 use crate::web::http_fetcher::WebDataFetcher;
375
376 let fetcher = WebDataFetcher::new()?;
377 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
379
380 for column in data_table.columns_mut() {
382 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
383 column.source_table = Some(cte.name.clone());
384 }
385
386 DataView::new(Arc::new(data_table))
388 }
389 };
390
391 plan_builder.set_rows_out(cte_result.row_count());
393 plan_builder.add_detail(format!(
394 "Result: {} rows, {} columns",
395 cte_result.row_count(),
396 cte_result.column_count()
397 ));
398 plan_builder.add_detail(format!(
399 "Execution time: {:.3}ms",
400 cte_start.elapsed().as_secs_f64() * 1000.0
401 ));
402
403 debug!(
404 "QueryEngine: Storing CTE '{}' in context with {} rows",
405 cte.name,
406 cte_result.row_count()
407 );
408 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
409 plan_builder.end_step();
410 }
411
412 plan_builder.add_detail(format!(
413 "All {} CTEs cached in context",
414 statement.ctes.len()
415 ));
416 plan_builder.end_step();
417 }
418
419 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
421 let mut subquery_executor =
422 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
423
424 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
426 format!("{:?}", w).contains("Subquery")
428 });
429
430 if has_subqueries {
431 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
432 }
433
434 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
435
436 if has_subqueries {
437 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
438 } else {
439 plan_builder.add_detail("No subqueries to process".to_string());
440 }
441
442 plan_builder.end_step();
443 let result = self.build_view_with_context_and_plan(
444 table,
445 processed_statement,
446 &mut cte_context,
447 &mut plan_builder,
448 )?;
449
450 let total_duration = start_time.elapsed();
451 info!(
452 "Query execution complete: total={:?}, rows={}",
453 total_duration,
454 result.row_count()
455 );
456
457 let plan = plan_builder.build();
458 Ok((result, plan))
459 }
460
461 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
463 let mut cte_context = HashMap::new();
464 self.build_view_with_context(table, statement, &mut cte_context)
465 }
466
467 fn build_view_with_context(
469 &self,
470 table: Arc<DataTable>,
471 statement: SelectStatement,
472 cte_context: &mut HashMap<String, Arc<DataView>>,
473 ) -> Result<DataView> {
474 let mut dummy_plan = ExecutionPlanBuilder::new();
475 self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
476 }
477
478 fn build_view_with_context_and_plan(
480 &self,
481 table: Arc<DataTable>,
482 statement: SelectStatement,
483 cte_context: &mut HashMap<String, Arc<DataView>>,
484 plan: &mut ExecutionPlanBuilder,
485 ) -> Result<DataView> {
486 for cte in &statement.ctes {
488 if cte_context.contains_key(&cte.name) {
490 debug!(
491 "QueryEngine: CTE '{}' already in context, skipping",
492 cte.name
493 );
494 continue;
495 }
496
497 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
498 debug!(
499 "QueryEngine: Available CTEs for '{}': {:?}",
500 cte.name,
501 cte_context.keys().collect::<Vec<_>>()
502 );
503
504 let cte_result = match &cte.cte_type {
506 CTEType::Standard(query) => {
507 let view =
508 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
509
510 let mut materialized = self.materialize_view(view)?;
512
513 for column in materialized.columns_mut() {
515 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
516 column.source_table = Some(cte.name.clone());
517 }
518
519 DataView::new(Arc::new(materialized))
520 }
521 CTEType::Web(_web_spec) => {
522 return Err(anyhow!(
524 "Web CTEs should be processed in execute_select method"
525 ));
526 }
527 };
528
529 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
531 debug!(
532 "QueryEngine: CTE '{}' processed, stored in context",
533 cte.name
534 );
535 }
536
537 let source_table = if let Some(ref table_func) = statement.from_function {
539 debug!("QueryEngine: Processing table function...");
541 match table_func {
542 TableFunction::Generator { name, args } => {
543 use crate::sql::generators::GeneratorRegistry;
545
546 let registry = GeneratorRegistry::new();
548
549 if let Some(generator) = registry.get(name) {
550 let mut evaluator = ArithmeticEvaluator::with_date_notation(
552 &table,
553 self.date_notation.clone(),
554 );
555 let dummy_row = 0;
556
557 let mut evaluated_args = Vec::new();
558 for arg in args {
559 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
560 }
561
562 generator.generate(evaluated_args)?
564 } else {
565 return Err(anyhow!("Unknown generator function: {}", name));
566 }
567 }
568 }
569 } else if let Some(ref subquery) = statement.from_subquery {
570 debug!("QueryEngine: Processing FROM subquery...");
572 let subquery_result =
573 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
574
575 let materialized = self.materialize_view(subquery_result)?;
578 Arc::new(materialized)
579 } else if let Some(ref table_name) = statement.from_table {
580 if let Some(cte_view) = cte_context.get(table_name) {
582 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
583 let mut materialized = self.materialize_view((**cte_view).clone())?;
585
586 if let Some(ref alias) = statement.from_alias {
588 debug!(
589 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
590 alias, table_name
591 );
592 for column in materialized.columns_mut() {
593 if let Some(ref qualified_name) = column.qualified_name {
595 if qualified_name.starts_with(&format!("{}.", table_name)) {
596 column.qualified_name =
597 Some(qualified_name.replace(
598 &format!("{}.", table_name),
599 &format!("{}.", alias),
600 ));
601 }
602 }
603 if column.source_table.as_ref() == Some(table_name) {
605 column.source_table = Some(alias.clone());
606 }
607 }
608 }
609
610 Arc::new(materialized)
611 } else {
612 table.clone()
614 }
615 } else {
616 table.clone()
618 };
619
620 let final_table = if !statement.joins.is_empty() {
622 plan.begin_step(
623 StepType::Join,
624 format!("Process {} JOINs", statement.joins.len()),
625 );
626 plan.set_rows_in(source_table.row_count());
627
628 let join_executor = HashJoinExecutor::new(self.case_insensitive);
629 let mut current_table = source_table;
630
631 for (idx, join_clause) in statement.joins.iter().enumerate() {
632 let join_start = Instant::now();
633 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
634 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
635 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
636 plan.add_detail(format!(
637 "Executing {:?} JOIN on {} condition(s)",
638 join_clause.join_type,
639 join_clause.condition.conditions.len()
640 ));
641
642 let right_table = match &join_clause.table {
644 TableSource::Table(name) => {
645 if let Some(cte_view) = cte_context.get(name) {
647 let mut materialized = self.materialize_view((**cte_view).clone())?;
648
649 if let Some(ref alias) = join_clause.alias {
651 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
652 for column in materialized.columns_mut() {
653 if let Some(ref qualified_name) = column.qualified_name {
655 if qualified_name.starts_with(&format!("{}.", name)) {
656 column.qualified_name = Some(qualified_name.replace(
657 &format!("{}.", name),
658 &format!("{}.", alias),
659 ));
660 }
661 }
662 if column.source_table.as_ref() == Some(name) {
664 column.source_table = Some(alias.clone());
665 }
666 }
667 }
668
669 Arc::new(materialized)
670 } else {
671 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
674 }
675 }
676 TableSource::DerivedTable { query, alias: _ } => {
677 let subquery_result = self.build_view_with_context(
679 table.clone(),
680 *query.clone(),
681 cte_context,
682 )?;
683 let materialized = self.materialize_view(subquery_result)?;
684 Arc::new(materialized)
685 }
686 };
687
688 let joined = join_executor.execute_join(
690 current_table.clone(),
691 join_clause,
692 right_table.clone(),
693 )?;
694
695 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
696 plan.set_rows_out(joined.row_count());
697 plan.add_detail(format!("Result: {} rows", joined.row_count()));
698 plan.add_detail(format!(
699 "Join time: {:.3}ms",
700 join_start.elapsed().as_secs_f64() * 1000.0
701 ));
702 plan.end_step();
703
704 current_table = Arc::new(joined);
705 }
706
707 plan.set_rows_out(current_table.row_count());
708 plan.add_detail(format!(
709 "Final result after all joins: {} rows",
710 current_table.row_count()
711 ));
712 plan.end_step();
713 current_table
714 } else {
715 source_table
716 };
717
718 self.build_view_internal_with_plan(final_table, statement, plan)
720 }
721
722 fn materialize_view(&self, view: DataView) -> Result<DataTable> {
724 let source = view.source();
725 let mut result_table = DataTable::new("derived");
726
727 let visible_cols = view.visible_column_indices().to_vec();
729
730 for col_idx in &visible_cols {
732 let col = &source.columns[*col_idx];
733 let new_col = DataColumn {
734 name: col.name.clone(),
735 data_type: col.data_type.clone(),
736 nullable: col.nullable,
737 unique_values: col.unique_values,
738 null_count: col.null_count,
739 metadata: col.metadata.clone(),
740 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
743 result_table.add_column(new_col);
744 }
745
746 for row_idx in view.visible_row_indices() {
748 let source_row = &source.rows[*row_idx];
749 let mut new_row = DataRow { values: Vec::new() };
750
751 for col_idx in &visible_cols {
752 new_row.values.push(source_row.values[*col_idx].clone());
753 }
754
755 result_table.add_row(new_row);
756 }
757
758 Ok(result_table)
759 }
760
761 fn build_view_internal(
762 &self,
763 table: Arc<DataTable>,
764 statement: SelectStatement,
765 ) -> Result<DataView> {
766 let mut dummy_plan = ExecutionPlanBuilder::new();
767 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
768 }
769
770 fn build_view_internal_with_plan(
771 &self,
772 table: Arc<DataTable>,
773 statement: SelectStatement,
774 plan: &mut ExecutionPlanBuilder,
775 ) -> Result<DataView> {
776 debug!(
777 "QueryEngine::build_view - select_items: {:?}",
778 statement.select_items
779 );
780 debug!(
781 "QueryEngine::build_view - where_clause: {:?}",
782 statement.where_clause
783 );
784
785 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
787
788 if let Some(where_clause) = &statement.where_clause {
790 let total_rows = table.row_count();
791 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
792 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
793
794 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
795 plan.set_rows_in(total_rows);
796 plan.add_detail(format!("Input: {} rows", total_rows));
797
798 for condition in &where_clause.conditions {
800 plan.add_detail(format!("Condition: {:?}", condition.expr));
801 }
802
803 let filter_start = Instant::now();
804 let mut eval_context = EvaluationContext::new(self.case_insensitive);
806
807 let mut filtered_rows = Vec::new();
809 for row_idx in visible_rows {
810 if row_idx < 3 {
812 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
813 }
814 let mut evaluator =
815 RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
816 match evaluator.evaluate(where_clause, row_idx) {
817 Ok(result) => {
818 if row_idx < 3 {
819 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
820 }
821 if result {
822 filtered_rows.push(row_idx);
823 }
824 }
825 Err(e) => {
826 if row_idx < 3 {
827 debug!(
828 "QueryEngine: WHERE evaluation error for row {}: {}",
829 row_idx, e
830 );
831 }
832 return Err(e);
834 }
835 }
836 }
837
838 let (compilations, cache_hits) = eval_context.get_stats();
840 if compilations > 0 || cache_hits > 0 {
841 debug!(
842 "LIKE pattern cache: {} compilations, {} cache hits",
843 compilations, cache_hits
844 );
845 }
846 visible_rows = filtered_rows;
847 let filter_duration = filter_start.elapsed();
848 info!(
849 "WHERE clause filtering: {} rows -> {} rows in {:?}",
850 total_rows,
851 visible_rows.len(),
852 filter_duration
853 );
854
855 plan.set_rows_out(visible_rows.len());
856 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
857 plan.add_detail(format!(
858 "Filter time: {:.3}ms",
859 filter_duration.as_secs_f64() * 1000.0
860 ));
861 plan.end_step();
862 }
863
864 let mut view = DataView::new(table.clone());
866 view = view.with_rows(visible_rows);
867
868 if let Some(group_by_exprs) = &statement.group_by {
870 if !group_by_exprs.is_empty() {
871 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
872
873 plan.begin_step(
874 StepType::GroupBy,
875 format!("GROUP BY {} expressions", group_by_exprs.len()),
876 );
877 plan.set_rows_in(view.row_count());
878 plan.add_detail(format!("Input: {} rows", view.row_count()));
879 for expr in group_by_exprs {
880 plan.add_detail(format!("Group by: {:?}", expr));
881 }
882
883 let group_start = Instant::now();
884 view = self.apply_group_by(
885 view,
886 group_by_exprs,
887 &statement.select_items,
888 statement.having.as_ref(),
889 plan,
890 )?;
891
892 plan.set_rows_out(view.row_count());
893 plan.add_detail(format!("Output: {} groups", view.row_count()));
894 plan.add_detail(format!(
895 "Overall time: {:.3}ms",
896 group_start.elapsed().as_secs_f64() * 1000.0
897 ));
898 plan.end_step();
899 }
900 } else {
901 if !statement.select_items.is_empty() {
903 let has_non_star_items = statement
905 .select_items
906 .iter()
907 .any(|item| !matches!(item, SelectItem::Star));
908
909 if has_non_star_items || statement.select_items.len() > 1 {
913 view = self.apply_select_items(view, &statement.select_items)?;
914 } else {
915 }
916 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
918 debug!("QueryEngine: Using legacy columns path");
919 let source_table = view.source();
922 let column_indices =
923 self.resolve_column_indices(source_table, &statement.columns)?;
924 view = view.with_columns(column_indices);
925 }
926 }
927
928 if statement.distinct {
930 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
931 plan.set_rows_in(view.row_count());
932 plan.add_detail(format!("Input: {} rows", view.row_count()));
933
934 let distinct_start = Instant::now();
935 view = self.apply_distinct(view)?;
936
937 plan.set_rows_out(view.row_count());
938 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
939 plan.add_detail(format!(
940 "Distinct time: {:.3}ms",
941 distinct_start.elapsed().as_secs_f64() * 1000.0
942 ));
943 plan.end_step();
944 }
945
946 if let Some(order_by_columns) = &statement.order_by {
948 if !order_by_columns.is_empty() {
949 plan.begin_step(
950 StepType::Sort,
951 format!("ORDER BY {} columns", order_by_columns.len()),
952 );
953 plan.set_rows_in(view.row_count());
954 for col in order_by_columns {
955 plan.add_detail(format!("{} {:?}", col.column, col.direction));
956 }
957
958 let sort_start = Instant::now();
959 view = self.apply_multi_order_by(view, order_by_columns)?;
960
961 plan.add_detail(format!(
962 "Sort time: {:.3}ms",
963 sort_start.elapsed().as_secs_f64() * 1000.0
964 ));
965 plan.end_step();
966 }
967 }
968
969 if let Some(limit) = statement.limit {
971 let offset = statement.offset.unwrap_or(0);
972 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
973 plan.set_rows_in(view.row_count());
974 if offset > 0 {
975 plan.add_detail(format!("OFFSET: {}", offset));
976 }
977 view = view.with_limit(limit, offset);
978 plan.set_rows_out(view.row_count());
979 plan.add_detail(format!("Output: {} rows", view.row_count()));
980 plan.end_step();
981 }
982
983 Ok(view)
984 }
985
986 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
988 let mut indices = Vec::new();
989 let table_columns = table.column_names();
990
991 for col_name in columns {
992 let index = table_columns
993 .iter()
994 .position(|c| c.eq_ignore_ascii_case(col_name))
995 .ok_or_else(|| {
996 let suggestion = self.find_similar_column(table, col_name);
997 match suggestion {
998 Some(similar) => anyhow::anyhow!(
999 "Column '{}' not found. Did you mean '{}'?",
1000 col_name,
1001 similar
1002 ),
1003 None => anyhow::anyhow!("Column '{}' not found", col_name),
1004 }
1005 })?;
1006 indices.push(index);
1007 }
1008
1009 Ok(indices)
1010 }
1011
1012 fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1014 debug!(
1015 "QueryEngine::apply_select_items - items: {:?}",
1016 select_items
1017 );
1018 debug!(
1019 "QueryEngine::apply_select_items - input view has {} rows",
1020 view.row_count()
1021 );
1022
1023 let has_aggregates = select_items.iter().any(|item| match item {
1027 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1028 SelectItem::Column(_) => false,
1029 SelectItem::Star => false,
1030 });
1031
1032 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1033 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1034 SelectItem::Column(_) => false, SelectItem::Star => false, });
1037
1038 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1039 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1042 return self.apply_aggregate_select(view, select_items);
1043 }
1044
1045 let has_computed_expressions = select_items
1047 .iter()
1048 .any(|item| matches!(item, SelectItem::Expression { .. }));
1049
1050 debug!(
1051 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1052 has_computed_expressions
1053 );
1054
1055 if !has_computed_expressions {
1056 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1058 return Ok(view.with_columns(column_indices));
1059 }
1060
1061 let source_table = view.source();
1066 let visible_rows = view.visible_row_indices();
1067
1068 let mut computed_table = DataTable::new("query_result");
1071
1072 let mut expanded_items = Vec::new();
1074 for item in select_items {
1075 match item {
1076 SelectItem::Star => {
1077 for col_name in source_table.column_names() {
1079 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1080 col_name.to_string(),
1081 )));
1082 }
1083 }
1084 _ => expanded_items.push(item.clone()),
1085 }
1086 }
1087
1088 let mut column_name_counts: std::collections::HashMap<String, usize> =
1090 std::collections::HashMap::new();
1091
1092 for item in &expanded_items {
1093 let base_name = match item {
1094 SelectItem::Column(col_ref) => col_ref.name.clone(),
1095 SelectItem::Expression { alias, .. } => alias.clone(),
1096 SelectItem::Star => unreachable!("Star should have been expanded"),
1097 };
1098
1099 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1101 let column_name = if *count == 0 {
1102 base_name.clone()
1104 } else {
1105 format!("{base_name}_{count}")
1107 };
1108 *count += 1;
1109
1110 computed_table.add_column(DataColumn::new(&column_name));
1111 }
1112
1113 let mut evaluator =
1115 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1116
1117 for &row_idx in visible_rows {
1118 let mut row_values = Vec::new();
1119
1120 for item in &expanded_items {
1121 let value = match item {
1122 SelectItem::Column(col_ref) => {
1123 let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1125 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1127 let result = source_table.find_column_by_qualified_name(&qualified_name);
1128
1129 if result.is_none() {
1131 let available_qualified: Vec<String> = source_table.columns.iter()
1132 .filter_map(|c| c.qualified_name.clone())
1133 .collect();
1134 let available_simple: Vec<String> = source_table.columns.iter()
1135 .map(|c| c.name.clone())
1136 .collect();
1137 debug!(
1138 "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1139 qualified_name, available_qualified, available_simple
1140 );
1141 }
1143 result
1144 } else {
1145 source_table.get_column_index(&col_ref.name)
1147 }
1148 .ok_or_else(|| {
1149 let display_name = if let Some(prefix) = &col_ref.table_prefix {
1150 format!("{}.{}", prefix, col_ref.name)
1151 } else {
1152 col_ref.name.clone()
1153 };
1154 let suggestion = self.find_similar_column(source_table, &col_ref.name);
1155 match suggestion {
1156 Some(similar) => anyhow::anyhow!(
1157 "Column '{}' not found. Did you mean '{}'?",
1158 display_name,
1159 similar
1160 ),
1161 None => anyhow::anyhow!("Column '{}' not found", display_name),
1162 }
1163 })?;
1164 let row = source_table
1165 .get_row(row_idx)
1166 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1167 row.get(col_idx)
1168 .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1169 .clone()
1170 }
1171 SelectItem::Expression { expr, .. } => {
1172 evaluator.evaluate(expr, row_idx)?
1174 }
1175 SelectItem::Star => unreachable!("Star should have been expanded"),
1176 };
1177 row_values.push(value);
1178 }
1179
1180 computed_table
1181 .add_row(DataRow::new(row_values))
1182 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1183 }
1184
1185 Ok(DataView::new(Arc::new(computed_table)))
1188 }
1189
1190 fn apply_aggregate_select(
1192 &self,
1193 view: DataView,
1194 select_items: &[SelectItem],
1195 ) -> Result<DataView> {
1196 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1197
1198 let source_table = view.source();
1199 let mut result_table = DataTable::new("aggregate_result");
1200
1201 for item in select_items {
1203 let column_name = match item {
1204 SelectItem::Expression { alias, .. } => alias.clone(),
1205 _ => unreachable!("Should only have expressions in aggregate-only query"),
1206 };
1207 result_table.add_column(DataColumn::new(&column_name));
1208 }
1209
1210 let visible_rows = view.visible_row_indices().to_vec();
1212 let mut evaluator =
1213 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1214 .with_visible_rows(visible_rows);
1215
1216 let mut row_values = Vec::new();
1218 for item in select_items {
1219 match item {
1220 SelectItem::Expression { expr, .. } => {
1221 let value = evaluator.evaluate(expr, 0)?;
1224 row_values.push(value);
1225 }
1226 _ => unreachable!("Should only have expressions in aggregate-only query"),
1227 }
1228 }
1229
1230 result_table
1232 .add_row(DataRow::new(row_values))
1233 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1234
1235 Ok(DataView::new(Arc::new(result_table)))
1236 }
1237
1238 fn resolve_select_columns(
1240 &self,
1241 table: &DataTable,
1242 select_items: &[SelectItem],
1243 ) -> Result<Vec<usize>> {
1244 let mut indices = Vec::new();
1245 let table_columns = table.column_names();
1246
1247 for item in select_items {
1248 match item {
1249 SelectItem::Column(col_ref) => {
1250 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1252 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1254 table.find_column_by_qualified_name(&qualified_name)
1255 .ok_or_else(|| {
1256 let has_qualified = table.columns.iter()
1258 .any(|c| c.qualified_name.is_some());
1259 if !has_qualified {
1260 anyhow::anyhow!(
1261 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1262 qualified_name, table_prefix
1263 )
1264 } else {
1265 anyhow::anyhow!("Column '{}' not found", qualified_name)
1266 }
1267 })?
1268 } else {
1269 table_columns
1271 .iter()
1272 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1273 .ok_or_else(|| {
1274 let suggestion = self.find_similar_column(table, &col_ref.name);
1275 match suggestion {
1276 Some(similar) => anyhow::anyhow!(
1277 "Column '{}' not found. Did you mean '{}'?",
1278 col_ref.name,
1279 similar
1280 ),
1281 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1282 }
1283 })?
1284 };
1285 indices.push(index);
1286 }
1287 SelectItem::Star => {
1288 for i in 0..table_columns.len() {
1290 indices.push(i);
1291 }
1292 }
1293 SelectItem::Expression { .. } => {
1294 return Err(anyhow::anyhow!(
1295 "Computed expressions require new table creation"
1296 ));
1297 }
1298 }
1299 }
1300
1301 Ok(indices)
1302 }
1303
1304 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1306 use std::collections::HashSet;
1307
1308 let source = view.source();
1309 let visible_cols = view.visible_column_indices();
1310 let visible_rows = view.visible_row_indices();
1311
1312 let mut seen_rows = HashSet::new();
1314 let mut unique_row_indices = Vec::new();
1315
1316 for &row_idx in visible_rows {
1317 let mut row_key = Vec::new();
1319 for &col_idx in visible_cols {
1320 let value = source
1321 .get_value(row_idx, col_idx)
1322 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1323 row_key.push(format!("{:?}", value));
1325 }
1326
1327 if seen_rows.insert(row_key) {
1329 unique_row_indices.push(row_idx);
1331 }
1332 }
1333
1334 Ok(view.with_rows(unique_row_indices))
1336 }
1337
1338 fn apply_multi_order_by(
1340 &self,
1341 mut view: DataView,
1342 order_by_columns: &[OrderByColumn],
1343 ) -> Result<DataView> {
1344 let mut sort_columns = Vec::new();
1346
1347 for order_col in order_by_columns {
1348 let col_index = view
1352 .source()
1353 .get_column_index(&order_col.column)
1354 .ok_or_else(|| {
1355 let suggestion = self.find_similar_column(view.source(), &order_col.column);
1357 match suggestion {
1358 Some(similar) => anyhow::anyhow!(
1359 "Column '{}' not found. Did you mean '{}'?",
1360 order_col.column,
1361 similar
1362 ),
1363 None => {
1364 let available_cols = view.source().column_names().join(", ");
1366 anyhow::anyhow!(
1367 "Column '{}' not found. Available columns: {}",
1368 order_col.column,
1369 available_cols
1370 )
1371 }
1372 }
1373 })?;
1374
1375 let ascending = matches!(order_col.direction, SortDirection::Asc);
1376 sort_columns.push((col_index, ascending));
1377 }
1378
1379 view.apply_multi_sort(&sort_columns)?;
1381 Ok(view)
1382 }
1383
1384 fn apply_group_by(
1386 &self,
1387 view: DataView,
1388 group_by_exprs: &[SqlExpression],
1389 select_items: &[SelectItem],
1390 having: Option<&SqlExpression>,
1391 plan: &mut ExecutionPlanBuilder,
1392 ) -> Result<DataView> {
1393 let (result_view, phase_info) = self.apply_group_by_expressions(
1395 view,
1396 group_by_exprs,
1397 select_items,
1398 having,
1399 self.case_insensitive,
1400 self.date_notation.clone(),
1401 )?;
1402
1403 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1405 plan.add_detail(format!(
1406 "Phase 1 - Group Building: {:.3}ms",
1407 phase_info.phase2_key_building.as_secs_f64() * 1000.0
1408 ));
1409 plan.add_detail(format!(
1410 " • Processing {} rows into {} groups",
1411 phase_info.total_rows, phase_info.num_groups
1412 ));
1413 plan.add_detail(format!(
1414 "Phase 2 - Aggregation: {:.3}ms",
1415 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
1416 ));
1417 if phase_info.phase4_having_evaluation > Duration::ZERO {
1418 plan.add_detail(format!(
1419 "Phase 3 - HAVING Filter: {:.3}ms",
1420 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
1421 ));
1422 plan.add_detail(format!(
1423 " • Filtered {} groups",
1424 phase_info.groups_filtered_by_having
1425 ));
1426 }
1427 plan.add_detail(format!(
1428 "Total GROUP BY time: {:.3}ms",
1429 phase_info.total_time.as_secs_f64() * 1000.0
1430 ));
1431
1432 Ok(result_view)
1433 }
1434
1435 pub fn estimate_group_cardinality(
1438 &self,
1439 view: &DataView,
1440 group_by_exprs: &[SqlExpression],
1441 ) -> usize {
1442 let row_count = view.get_visible_rows().len();
1444 if row_count <= 100 {
1445 return row_count;
1446 }
1447
1448 let sample_size = min(1000, row_count / 10).max(100);
1450 let mut seen = FxHashSet::default();
1451
1452 let visible_rows = view.get_visible_rows();
1453 for (i, &row_idx) in visible_rows.iter().enumerate() {
1454 if i >= sample_size {
1455 break;
1456 }
1457
1458 let mut key_values = Vec::new();
1460 for expr in group_by_exprs {
1461 let mut evaluator = ArithmeticEvaluator::new(view.source());
1462 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1463 key_values.push(value);
1464 }
1465
1466 seen.insert(key_values);
1467 }
1468
1469 let sample_cardinality = seen.len();
1471 let estimated = (sample_cardinality * row_count) / sample_size;
1472
1473 estimated.min(row_count).max(sample_cardinality)
1475 }
1476}
1477
1478#[cfg(test)]
1479mod tests {
1480 use super::*;
1481 use crate::data::datatable::{DataColumn, DataRow, DataValue};
1482
1483 fn create_test_table() -> Arc<DataTable> {
1484 let mut table = DataTable::new("test");
1485
1486 table.add_column(DataColumn::new("id"));
1488 table.add_column(DataColumn::new("name"));
1489 table.add_column(DataColumn::new("age"));
1490
1491 table
1493 .add_row(DataRow::new(vec![
1494 DataValue::Integer(1),
1495 DataValue::String("Alice".to_string()),
1496 DataValue::Integer(30),
1497 ]))
1498 .unwrap();
1499
1500 table
1501 .add_row(DataRow::new(vec![
1502 DataValue::Integer(2),
1503 DataValue::String("Bob".to_string()),
1504 DataValue::Integer(25),
1505 ]))
1506 .unwrap();
1507
1508 table
1509 .add_row(DataRow::new(vec![
1510 DataValue::Integer(3),
1511 DataValue::String("Charlie".to_string()),
1512 DataValue::Integer(35),
1513 ]))
1514 .unwrap();
1515
1516 Arc::new(table)
1517 }
1518
1519 #[test]
1520 fn test_select_all() {
1521 let table = create_test_table();
1522 let engine = QueryEngine::new();
1523
1524 let view = engine
1525 .execute(table.clone(), "SELECT * FROM users")
1526 .unwrap();
1527 assert_eq!(view.row_count(), 3);
1528 assert_eq!(view.column_count(), 3);
1529 }
1530
1531 #[test]
1532 fn test_select_columns() {
1533 let table = create_test_table();
1534 let engine = QueryEngine::new();
1535
1536 let view = engine
1537 .execute(table.clone(), "SELECT name, age FROM users")
1538 .unwrap();
1539 assert_eq!(view.row_count(), 3);
1540 assert_eq!(view.column_count(), 2);
1541 }
1542
1543 #[test]
1544 fn test_select_with_limit() {
1545 let table = create_test_table();
1546 let engine = QueryEngine::new();
1547
1548 let view = engine
1549 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1550 .unwrap();
1551 assert_eq!(view.row_count(), 2);
1552 }
1553
1554 #[test]
1555 fn test_type_coercion_contains() {
1556 let _ = tracing_subscriber::fmt()
1558 .with_max_level(tracing::Level::DEBUG)
1559 .try_init();
1560
1561 let mut table = DataTable::new("test");
1562 table.add_column(DataColumn::new("id"));
1563 table.add_column(DataColumn::new("status"));
1564 table.add_column(DataColumn::new("price"));
1565
1566 table
1568 .add_row(DataRow::new(vec![
1569 DataValue::Integer(1),
1570 DataValue::String("Pending".to_string()),
1571 DataValue::Float(99.99),
1572 ]))
1573 .unwrap();
1574
1575 table
1576 .add_row(DataRow::new(vec![
1577 DataValue::Integer(2),
1578 DataValue::String("Confirmed".to_string()),
1579 DataValue::Float(150.50),
1580 ]))
1581 .unwrap();
1582
1583 table
1584 .add_row(DataRow::new(vec![
1585 DataValue::Integer(3),
1586 DataValue::String("Pending".to_string()),
1587 DataValue::Float(75.00),
1588 ]))
1589 .unwrap();
1590
1591 let table = Arc::new(table);
1592 let engine = QueryEngine::new();
1593
1594 println!("\n=== Testing WHERE clause with Contains ===");
1595 println!("Table has {} rows", table.row_count());
1596 for i in 0..table.row_count() {
1597 let status = table.get_value(i, 1);
1598 println!("Row {i}: status = {status:?}");
1599 }
1600
1601 println!("\n--- Test 1: status.Contains('pend') ---");
1603 let result = engine.execute(
1604 table.clone(),
1605 "SELECT * FROM test WHERE status.Contains('pend')",
1606 );
1607 match result {
1608 Ok(view) => {
1609 println!("SUCCESS: Found {} matching rows", view.row_count());
1610 assert_eq!(view.row_count(), 2); }
1612 Err(e) => {
1613 panic!("Query failed: {e}");
1614 }
1615 }
1616
1617 println!("\n--- Test 2: price.Contains('9') ---");
1619 let result = engine.execute(
1620 table.clone(),
1621 "SELECT * FROM test WHERE price.Contains('9')",
1622 );
1623 match result {
1624 Ok(view) => {
1625 println!(
1626 "SUCCESS: Found {} matching rows with price containing '9'",
1627 view.row_count()
1628 );
1629 assert!(view.row_count() >= 1);
1631 }
1632 Err(e) => {
1633 panic!("Numeric coercion query failed: {e}");
1634 }
1635 }
1636
1637 println!("\n=== All tests passed! ===");
1638 }
1639
1640 #[test]
1641 fn test_not_in_clause() {
1642 let _ = tracing_subscriber::fmt()
1644 .with_max_level(tracing::Level::DEBUG)
1645 .try_init();
1646
1647 let mut table = DataTable::new("test");
1648 table.add_column(DataColumn::new("id"));
1649 table.add_column(DataColumn::new("country"));
1650
1651 table
1653 .add_row(DataRow::new(vec![
1654 DataValue::Integer(1),
1655 DataValue::String("CA".to_string()),
1656 ]))
1657 .unwrap();
1658
1659 table
1660 .add_row(DataRow::new(vec![
1661 DataValue::Integer(2),
1662 DataValue::String("US".to_string()),
1663 ]))
1664 .unwrap();
1665
1666 table
1667 .add_row(DataRow::new(vec![
1668 DataValue::Integer(3),
1669 DataValue::String("UK".to_string()),
1670 ]))
1671 .unwrap();
1672
1673 let table = Arc::new(table);
1674 let engine = QueryEngine::new();
1675
1676 println!("\n=== Testing NOT IN clause ===");
1677 println!("Table has {} rows", table.row_count());
1678 for i in 0..table.row_count() {
1679 let country = table.get_value(i, 1);
1680 println!("Row {i}: country = {country:?}");
1681 }
1682
1683 println!("\n--- Test: country NOT IN ('CA') ---");
1685 let result = engine.execute(
1686 table.clone(),
1687 "SELECT * FROM test WHERE country NOT IN ('CA')",
1688 );
1689 match result {
1690 Ok(view) => {
1691 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1692 assert_eq!(view.row_count(), 2); }
1694 Err(e) => {
1695 panic!("NOT IN query failed: {e}");
1696 }
1697 }
1698
1699 println!("\n=== NOT IN test complete! ===");
1700 }
1701
1702 #[test]
1703 fn test_case_insensitive_in_and_not_in() {
1704 let _ = tracing_subscriber::fmt()
1706 .with_max_level(tracing::Level::DEBUG)
1707 .try_init();
1708
1709 let mut table = DataTable::new("test");
1710 table.add_column(DataColumn::new("id"));
1711 table.add_column(DataColumn::new("country"));
1712
1713 table
1715 .add_row(DataRow::new(vec![
1716 DataValue::Integer(1),
1717 DataValue::String("CA".to_string()), ]))
1719 .unwrap();
1720
1721 table
1722 .add_row(DataRow::new(vec![
1723 DataValue::Integer(2),
1724 DataValue::String("us".to_string()), ]))
1726 .unwrap();
1727
1728 table
1729 .add_row(DataRow::new(vec![
1730 DataValue::Integer(3),
1731 DataValue::String("UK".to_string()), ]))
1733 .unwrap();
1734
1735 let table = Arc::new(table);
1736
1737 println!("\n=== Testing Case-Insensitive IN clause ===");
1738 println!("Table has {} rows", table.row_count());
1739 for i in 0..table.row_count() {
1740 let country = table.get_value(i, 1);
1741 println!("Row {i}: country = {country:?}");
1742 }
1743
1744 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1746 let engine = QueryEngine::with_case_insensitive(true);
1747 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1748 match result {
1749 Ok(view) => {
1750 println!(
1751 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1752 view.row_count()
1753 );
1754 assert_eq!(view.row_count(), 1); }
1756 Err(e) => {
1757 panic!("Case-insensitive IN query failed: {e}");
1758 }
1759 }
1760
1761 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1763 let result = engine.execute(
1764 table.clone(),
1765 "SELECT * FROM test WHERE country NOT IN ('ca')",
1766 );
1767 match result {
1768 Ok(view) => {
1769 println!(
1770 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1771 view.row_count()
1772 );
1773 assert_eq!(view.row_count(), 2); }
1775 Err(e) => {
1776 panic!("Case-insensitive NOT IN query failed: {e}");
1777 }
1778 }
1779
1780 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1782 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
1784 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1785 match result {
1786 Ok(view) => {
1787 println!(
1788 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1789 view.row_count()
1790 );
1791 assert_eq!(view.row_count(), 0); }
1793 Err(e) => {
1794 panic!("Case-sensitive IN query failed: {e}");
1795 }
1796 }
1797
1798 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1799 }
1800
1801 #[test]
1802 #[ignore = "Parentheses in WHERE clause not yet implemented"]
1803 fn test_parentheses_in_where_clause() {
1804 let _ = tracing_subscriber::fmt()
1806 .with_max_level(tracing::Level::DEBUG)
1807 .try_init();
1808
1809 let mut table = DataTable::new("test");
1810 table.add_column(DataColumn::new("id"));
1811 table.add_column(DataColumn::new("status"));
1812 table.add_column(DataColumn::new("priority"));
1813
1814 table
1816 .add_row(DataRow::new(vec![
1817 DataValue::Integer(1),
1818 DataValue::String("Pending".to_string()),
1819 DataValue::String("High".to_string()),
1820 ]))
1821 .unwrap();
1822
1823 table
1824 .add_row(DataRow::new(vec![
1825 DataValue::Integer(2),
1826 DataValue::String("Complete".to_string()),
1827 DataValue::String("High".to_string()),
1828 ]))
1829 .unwrap();
1830
1831 table
1832 .add_row(DataRow::new(vec![
1833 DataValue::Integer(3),
1834 DataValue::String("Pending".to_string()),
1835 DataValue::String("Low".to_string()),
1836 ]))
1837 .unwrap();
1838
1839 table
1840 .add_row(DataRow::new(vec![
1841 DataValue::Integer(4),
1842 DataValue::String("Complete".to_string()),
1843 DataValue::String("Low".to_string()),
1844 ]))
1845 .unwrap();
1846
1847 let table = Arc::new(table);
1848 let engine = QueryEngine::new();
1849
1850 println!("\n=== Testing Parentheses in WHERE clause ===");
1851 println!("Table has {} rows", table.row_count());
1852 for i in 0..table.row_count() {
1853 let status = table.get_value(i, 1);
1854 let priority = table.get_value(i, 2);
1855 println!("Row {i}: status = {status:?}, priority = {priority:?}");
1856 }
1857
1858 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1860 let result = engine.execute(
1861 table.clone(),
1862 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1863 );
1864 match result {
1865 Ok(view) => {
1866 println!(
1867 "SUCCESS: Found {} rows with parenthetical logic",
1868 view.row_count()
1869 );
1870 assert_eq!(view.row_count(), 2); }
1872 Err(e) => {
1873 panic!("Parentheses query failed: {e}");
1874 }
1875 }
1876
1877 println!("\n=== Parentheses test complete! ===");
1878 }
1879
1880 #[test]
1881 #[ignore = "Numeric type coercion needs fixing"]
1882 fn test_numeric_type_coercion() {
1883 let _ = tracing_subscriber::fmt()
1885 .with_max_level(tracing::Level::DEBUG)
1886 .try_init();
1887
1888 let mut table = DataTable::new("test");
1889 table.add_column(DataColumn::new("id"));
1890 table.add_column(DataColumn::new("price"));
1891 table.add_column(DataColumn::new("quantity"));
1892
1893 table
1895 .add_row(DataRow::new(vec![
1896 DataValue::Integer(1),
1897 DataValue::Float(99.50), DataValue::Integer(100),
1899 ]))
1900 .unwrap();
1901
1902 table
1903 .add_row(DataRow::new(vec![
1904 DataValue::Integer(2),
1905 DataValue::Float(150.0), DataValue::Integer(200),
1907 ]))
1908 .unwrap();
1909
1910 table
1911 .add_row(DataRow::new(vec![
1912 DataValue::Integer(3),
1913 DataValue::Integer(75), DataValue::Integer(50),
1915 ]))
1916 .unwrap();
1917
1918 let table = Arc::new(table);
1919 let engine = QueryEngine::new();
1920
1921 println!("\n=== Testing Numeric Type Coercion ===");
1922 println!("Table has {} rows", table.row_count());
1923 for i in 0..table.row_count() {
1924 let price = table.get_value(i, 1);
1925 let quantity = table.get_value(i, 2);
1926 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1927 }
1928
1929 println!("\n--- Test: price.Contains('.') ---");
1931 let result = engine.execute(
1932 table.clone(),
1933 "SELECT * FROM test WHERE price.Contains('.')",
1934 );
1935 match result {
1936 Ok(view) => {
1937 println!(
1938 "SUCCESS: Found {} rows with decimal points in price",
1939 view.row_count()
1940 );
1941 assert_eq!(view.row_count(), 2); }
1943 Err(e) => {
1944 panic!("Numeric Contains query failed: {e}");
1945 }
1946 }
1947
1948 println!("\n--- Test: quantity.Contains('0') ---");
1950 let result = engine.execute(
1951 table.clone(),
1952 "SELECT * FROM test WHERE quantity.Contains('0')",
1953 );
1954 match result {
1955 Ok(view) => {
1956 println!(
1957 "SUCCESS: Found {} rows with '0' in quantity",
1958 view.row_count()
1959 );
1960 assert_eq!(view.row_count(), 2); }
1962 Err(e) => {
1963 panic!("Integer Contains query failed: {e}");
1964 }
1965 }
1966
1967 println!("\n=== Numeric type coercion test complete! ===");
1968 }
1969
1970 #[test]
1971 fn test_datetime_comparisons() {
1972 let _ = tracing_subscriber::fmt()
1974 .with_max_level(tracing::Level::DEBUG)
1975 .try_init();
1976
1977 let mut table = DataTable::new("test");
1978 table.add_column(DataColumn::new("id"));
1979 table.add_column(DataColumn::new("created_date"));
1980
1981 table
1983 .add_row(DataRow::new(vec![
1984 DataValue::Integer(1),
1985 DataValue::String("2024-12-15".to_string()),
1986 ]))
1987 .unwrap();
1988
1989 table
1990 .add_row(DataRow::new(vec![
1991 DataValue::Integer(2),
1992 DataValue::String("2025-01-15".to_string()),
1993 ]))
1994 .unwrap();
1995
1996 table
1997 .add_row(DataRow::new(vec![
1998 DataValue::Integer(3),
1999 DataValue::String("2025-02-15".to_string()),
2000 ]))
2001 .unwrap();
2002
2003 let table = Arc::new(table);
2004 let engine = QueryEngine::new();
2005
2006 println!("\n=== Testing DateTime Comparisons ===");
2007 println!("Table has {} rows", table.row_count());
2008 for i in 0..table.row_count() {
2009 let date = table.get_value(i, 1);
2010 println!("Row {i}: created_date = {date:?}");
2011 }
2012
2013 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2015 let result = engine.execute(
2016 table.clone(),
2017 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2018 );
2019 match result {
2020 Ok(view) => {
2021 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2022 assert_eq!(view.row_count(), 2); }
2024 Err(e) => {
2025 panic!("DateTime comparison query failed: {e}");
2026 }
2027 }
2028
2029 println!("\n=== DateTime comparison test complete! ===");
2030 }
2031
2032 #[test]
2033 fn test_not_with_method_calls() {
2034 let _ = tracing_subscriber::fmt()
2036 .with_max_level(tracing::Level::DEBUG)
2037 .try_init();
2038
2039 let mut table = DataTable::new("test");
2040 table.add_column(DataColumn::new("id"));
2041 table.add_column(DataColumn::new("status"));
2042
2043 table
2045 .add_row(DataRow::new(vec![
2046 DataValue::Integer(1),
2047 DataValue::String("Pending Review".to_string()),
2048 ]))
2049 .unwrap();
2050
2051 table
2052 .add_row(DataRow::new(vec![
2053 DataValue::Integer(2),
2054 DataValue::String("Complete".to_string()),
2055 ]))
2056 .unwrap();
2057
2058 table
2059 .add_row(DataRow::new(vec![
2060 DataValue::Integer(3),
2061 DataValue::String("Pending Approval".to_string()),
2062 ]))
2063 .unwrap();
2064
2065 let table = Arc::new(table);
2066 let engine = QueryEngine::with_case_insensitive(true);
2067
2068 println!("\n=== Testing NOT with Method Calls ===");
2069 println!("Table has {} rows", table.row_count());
2070 for i in 0..table.row_count() {
2071 let status = table.get_value(i, 1);
2072 println!("Row {i}: status = {status:?}");
2073 }
2074
2075 println!("\n--- Test: NOT status.Contains('pend') ---");
2077 let result = engine.execute(
2078 table.clone(),
2079 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2080 );
2081 match result {
2082 Ok(view) => {
2083 println!(
2084 "SUCCESS: Found {} rows NOT containing 'pend'",
2085 view.row_count()
2086 );
2087 assert_eq!(view.row_count(), 1); }
2089 Err(e) => {
2090 panic!("NOT Contains query failed: {e}");
2091 }
2092 }
2093
2094 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2096 let result = engine.execute(
2097 table.clone(),
2098 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2099 );
2100 match result {
2101 Ok(view) => {
2102 println!(
2103 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2104 view.row_count()
2105 );
2106 assert_eq!(view.row_count(), 1); }
2108 Err(e) => {
2109 panic!("NOT StartsWith query failed: {e}");
2110 }
2111 }
2112
2113 println!("\n=== NOT with method calls test complete! ===");
2114 }
2115
2116 #[test]
2117 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2118 fn test_complex_logical_expressions() {
2119 let _ = tracing_subscriber::fmt()
2121 .with_max_level(tracing::Level::DEBUG)
2122 .try_init();
2123
2124 let mut table = DataTable::new("test");
2125 table.add_column(DataColumn::new("id"));
2126 table.add_column(DataColumn::new("status"));
2127 table.add_column(DataColumn::new("priority"));
2128 table.add_column(DataColumn::new("assigned"));
2129
2130 table
2132 .add_row(DataRow::new(vec![
2133 DataValue::Integer(1),
2134 DataValue::String("Pending".to_string()),
2135 DataValue::String("High".to_string()),
2136 DataValue::String("John".to_string()),
2137 ]))
2138 .unwrap();
2139
2140 table
2141 .add_row(DataRow::new(vec![
2142 DataValue::Integer(2),
2143 DataValue::String("Complete".to_string()),
2144 DataValue::String("High".to_string()),
2145 DataValue::String("Jane".to_string()),
2146 ]))
2147 .unwrap();
2148
2149 table
2150 .add_row(DataRow::new(vec![
2151 DataValue::Integer(3),
2152 DataValue::String("Pending".to_string()),
2153 DataValue::String("Low".to_string()),
2154 DataValue::String("John".to_string()),
2155 ]))
2156 .unwrap();
2157
2158 table
2159 .add_row(DataRow::new(vec![
2160 DataValue::Integer(4),
2161 DataValue::String("In Progress".to_string()),
2162 DataValue::String("Medium".to_string()),
2163 DataValue::String("Jane".to_string()),
2164 ]))
2165 .unwrap();
2166
2167 let table = Arc::new(table);
2168 let engine = QueryEngine::new();
2169
2170 println!("\n=== Testing Complex Logical Expressions ===");
2171 println!("Table has {} rows", table.row_count());
2172 for i in 0..table.row_count() {
2173 let status = table.get_value(i, 1);
2174 let priority = table.get_value(i, 2);
2175 let assigned = table.get_value(i, 3);
2176 println!(
2177 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2178 );
2179 }
2180
2181 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2183 let result = engine.execute(
2184 table.clone(),
2185 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2186 );
2187 match result {
2188 Ok(view) => {
2189 println!(
2190 "SUCCESS: Found {} rows with complex logic",
2191 view.row_count()
2192 );
2193 assert_eq!(view.row_count(), 2); }
2195 Err(e) => {
2196 panic!("Complex logic query failed: {e}");
2197 }
2198 }
2199
2200 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2202 let result = engine.execute(
2203 table.clone(),
2204 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2205 );
2206 match result {
2207 Ok(view) => {
2208 println!(
2209 "SUCCESS: Found {} rows with NOT complex logic",
2210 view.row_count()
2211 );
2212 assert_eq!(view.row_count(), 2); }
2214 Err(e) => {
2215 panic!("NOT complex logic query failed: {e}");
2216 }
2217 }
2218
2219 println!("\n=== Complex logical expressions test complete! ===");
2220 }
2221
2222 #[test]
2223 fn test_mixed_data_types_and_edge_cases() {
2224 let _ = tracing_subscriber::fmt()
2226 .with_max_level(tracing::Level::DEBUG)
2227 .try_init();
2228
2229 let mut table = DataTable::new("test");
2230 table.add_column(DataColumn::new("id"));
2231 table.add_column(DataColumn::new("value"));
2232 table.add_column(DataColumn::new("nullable_field"));
2233
2234 table
2236 .add_row(DataRow::new(vec![
2237 DataValue::Integer(1),
2238 DataValue::String("123.45".to_string()),
2239 DataValue::String("present".to_string()),
2240 ]))
2241 .unwrap();
2242
2243 table
2244 .add_row(DataRow::new(vec![
2245 DataValue::Integer(2),
2246 DataValue::Float(678.90),
2247 DataValue::Null,
2248 ]))
2249 .unwrap();
2250
2251 table
2252 .add_row(DataRow::new(vec![
2253 DataValue::Integer(3),
2254 DataValue::Boolean(true),
2255 DataValue::String("also present".to_string()),
2256 ]))
2257 .unwrap();
2258
2259 table
2260 .add_row(DataRow::new(vec![
2261 DataValue::Integer(4),
2262 DataValue::String("false".to_string()),
2263 DataValue::Null,
2264 ]))
2265 .unwrap();
2266
2267 let table = Arc::new(table);
2268 let engine = QueryEngine::new();
2269
2270 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2271 println!("Table has {} rows", table.row_count());
2272 for i in 0..table.row_count() {
2273 let value = table.get_value(i, 1);
2274 let nullable = table.get_value(i, 2);
2275 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2276 }
2277
2278 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2280 let result = engine.execute(
2281 table.clone(),
2282 "SELECT * FROM test WHERE value.Contains('true')",
2283 );
2284 match result {
2285 Ok(view) => {
2286 println!(
2287 "SUCCESS: Found {} rows with boolean coercion",
2288 view.row_count()
2289 );
2290 assert_eq!(view.row_count(), 1); }
2292 Err(e) => {
2293 panic!("Boolean coercion query failed: {e}");
2294 }
2295 }
2296
2297 println!("\n--- Test: id IN (1, 3) ---");
2299 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2300 match result {
2301 Ok(view) => {
2302 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2303 assert_eq!(view.row_count(), 2); }
2305 Err(e) => {
2306 panic!("Multiple IN values query failed: {e}");
2307 }
2308 }
2309
2310 println!("\n=== Mixed data types test complete! ===");
2311 }
2312
2313 #[test]
2315 fn test_aggregate_only_single_row() {
2316 let table = create_test_stock_data();
2317 let engine = QueryEngine::new();
2318
2319 let result = engine
2321 .execute(
2322 table.clone(),
2323 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2324 )
2325 .expect("Query should succeed");
2326
2327 assert_eq!(
2328 result.row_count(),
2329 1,
2330 "Aggregate-only query should return exactly 1 row"
2331 );
2332 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2333
2334 let source = result.source();
2336 let row = source.get_row(0).expect("Should have first row");
2337
2338 assert_eq!(row.values[0], DataValue::Integer(5));
2340
2341 assert_eq!(row.values[1], DataValue::Float(99.5));
2343
2344 assert_eq!(row.values[2], DataValue::Float(105.0));
2346
2347 if let DataValue::Float(avg) = &row.values[3] {
2349 assert!(
2350 (avg - 102.4).abs() < 0.01,
2351 "Average should be approximately 102.4, got {}",
2352 avg
2353 );
2354 } else {
2355 panic!("AVG should return a Float value");
2356 }
2357 }
2358
2359 #[test]
2361 fn test_single_aggregate_single_row() {
2362 let table = create_test_stock_data();
2363 let engine = QueryEngine::new();
2364
2365 let result = engine
2366 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2367 .expect("Query should succeed");
2368
2369 assert_eq!(
2370 result.row_count(),
2371 1,
2372 "Single aggregate query should return exactly 1 row"
2373 );
2374 assert_eq!(result.column_count(), 1, "Should have 1 column");
2375
2376 let source = result.source();
2377 let row = source.get_row(0).expect("Should have first row");
2378 assert_eq!(row.values[0], DataValue::Integer(5));
2379 }
2380
2381 #[test]
2383 fn test_aggregate_with_where_single_row() {
2384 let table = create_test_stock_data();
2385 let engine = QueryEngine::new();
2386
2387 let result = engine
2389 .execute(
2390 table.clone(),
2391 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2392 )
2393 .expect("Query should succeed");
2394
2395 assert_eq!(
2396 result.row_count(),
2397 1,
2398 "Filtered aggregate query should return exactly 1 row"
2399 );
2400 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2401
2402 let source = result.source();
2403 let row = source.get_row(0).expect("Should have first row");
2404
2405 assert_eq!(row.values[0], DataValue::Integer(2));
2407 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
2410
2411 #[test]
2412 fn test_not_in_parsing() {
2413 use crate::sql::recursive_parser::Parser;
2414
2415 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2416 println!("\n=== Testing NOT IN parsing ===");
2417 println!("Parsing query: {query}");
2418
2419 let mut parser = Parser::new(query);
2420 match parser.parse() {
2421 Ok(statement) => {
2422 println!("Parsed statement: {statement:#?}");
2423 if let Some(where_clause) = statement.where_clause {
2424 println!("WHERE conditions: {:#?}", where_clause.conditions);
2425 if let Some(first_condition) = where_clause.conditions.first() {
2426 println!("First condition expression: {:#?}", first_condition.expr);
2427 }
2428 }
2429 }
2430 Err(e) => {
2431 panic!("Parse error: {e}");
2432 }
2433 }
2434 }
2435
2436 fn create_test_stock_data() -> Arc<DataTable> {
2438 let mut table = DataTable::new("stock");
2439
2440 table.add_column(DataColumn::new("symbol"));
2441 table.add_column(DataColumn::new("close"));
2442 table.add_column(DataColumn::new("volume"));
2443
2444 let test_data = vec![
2446 ("AAPL", 99.5, 1000),
2447 ("AAPL", 101.2, 1500),
2448 ("AAPL", 103.5, 2000),
2449 ("AAPL", 105.0, 1200),
2450 ("AAPL", 102.8, 1800),
2451 ];
2452
2453 for (symbol, close, volume) in test_data {
2454 table
2455 .add_row(DataRow::new(vec![
2456 DataValue::String(symbol.to_string()),
2457 DataValue::Float(close),
2458 DataValue::Integer(volume),
2459 ]))
2460 .expect("Should add row successfully");
2461 }
2462
2463 Arc::new(table)
2464 }
2465}
2466
2467#[cfg(test)]
2468#[path = "query_engine_tests.rs"]
2469mod query_engine_tests;