1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::{GroupByExpressions, GroupByPhaseInfo};
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26 TableFunction,
27};
28
29#[derive(Clone)]
31pub struct QueryEngine {
32 case_insensitive: bool,
33 date_notation: String,
34 behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl QueryEngine {
44 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 case_insensitive: false,
48 date_notation: get_date_notation(),
49 behavior_config: None,
50 }
51 }
52
53 #[must_use]
54 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55 let case_insensitive = config.case_insensitive_default;
56 let date_notation = get_date_notation();
58 Self {
59 case_insensitive,
60 date_notation,
61 behavior_config: Some(config),
62 }
63 }
64
65 #[must_use]
66 pub fn with_date_notation(_date_notation: String) -> Self {
67 Self {
68 case_insensitive: false,
69 date_notation: get_date_notation(), behavior_config: None,
71 }
72 }
73
74 #[must_use]
75 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76 Self {
77 case_insensitive,
78 date_notation: get_date_notation(),
79 behavior_config: None,
80 }
81 }
82
83 #[must_use]
84 pub fn with_case_insensitive_and_date_notation(
85 case_insensitive: bool,
86 _date_notation: String, ) -> Self {
88 Self {
89 case_insensitive,
90 date_notation: get_date_notation(), behavior_config: None,
92 }
93 }
94
95 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97 let columns = table.column_names();
98 let mut best_match: Option<(String, usize)> = None;
99
100 for col in columns {
101 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102 let max_distance = if name.len() > 10 { 3 } else { 2 };
105 if distance <= max_distance {
106 match &best_match {
107 None => best_match = Some((col, distance)),
108 Some((_, best_dist)) if distance < *best_dist => {
109 best_match = Some((col, distance));
110 }
111 _ => {}
112 }
113 }
114 }
115
116 best_match.map(|(name, _)| name)
117 }
118
119 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121 let len1 = s1.len();
122 let len2 = s2.len();
123 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125 for i in 0..=len1 {
126 matrix[i][0] = i;
127 }
128 for j in 0..=len2 {
129 matrix[0][j] = j;
130 }
131
132 for (i, c1) in s1.chars().enumerate() {
133 for (j, c2) in s2.chars().enumerate() {
134 let cost = usize::from(c1 != c2);
135 matrix[i + 1][j + 1] = std::cmp::min(
136 matrix[i][j + 1] + 1, std::cmp::min(
138 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
141 );
142 }
143 }
144
145 matrix[len1][len2]
146 }
147
148 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150 let (view, _plan) = self.execute_with_plan(table, sql)?;
151 Ok(view)
152 }
153
154 pub fn execute_statement(
156 &self,
157 table: Arc<DataTable>,
158 statement: SelectStatement,
159 ) -> Result<DataView> {
160 let mut cte_context = HashMap::new();
162 for cte in &statement.ctes {
163 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164 let cte_result = match &cte.cte_type {
166 CTEType::Standard(query) => {
167 let view = self.build_view_with_context(
169 table.clone(),
170 query.clone(),
171 &mut cte_context,
172 )?;
173
174 let mut materialized = self.materialize_view(view)?;
176
177 for column in materialized.columns_mut() {
179 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180 column.source_table = Some(cte.name.clone());
181 }
182
183 DataView::new(Arc::new(materialized))
184 }
185 CTEType::Web(web_spec) => {
186 use crate::web::http_fetcher::WebDataFetcher;
188
189 let fetcher = WebDataFetcher::new()?;
190 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192 for column in data_table.columns_mut() {
194 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195 column.source_table = Some(cte.name.clone());
196 }
197
198 DataView::new(Arc::new(data_table))
200 }
201 };
202 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204 debug!(
205 "QueryEngine: CTE '{}' pre-processed, stored in context",
206 cte.name
207 );
208 }
209
210 let mut subquery_executor =
212 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215 self.build_view_with_context(table, processed_statement, &mut cte_context)
217 }
218
219 pub fn execute_statement_with_cte_context(
221 &self,
222 table: Arc<DataTable>,
223 statement: SelectStatement,
224 cte_context: &HashMap<String, Arc<DataView>>,
225 ) -> Result<DataView> {
226 let mut local_context = cte_context.clone();
228
229 for cte in &statement.ctes {
231 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232 let cte_result = match &cte.cte_type {
233 CTEType::Standard(query) => {
234 let view = self.build_view_with_context(
235 table.clone(),
236 query.clone(),
237 &mut local_context,
238 )?;
239
240 let mut materialized = self.materialize_view(view)?;
242
243 for column in materialized.columns_mut() {
245 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246 column.source_table = Some(cte.name.clone());
247 }
248
249 DataView::new(Arc::new(materialized))
250 }
251 CTEType::Web(web_spec) => {
252 use crate::web::http_fetcher::WebDataFetcher;
254
255 let fetcher = WebDataFetcher::new()?;
256 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258 for column in data_table.columns_mut() {
260 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261 column.source_table = Some(cte.name.clone());
262 }
263
264 DataView::new(Arc::new(data_table))
266 }
267 };
268 local_context.insert(cte.name.clone(), Arc::new(cte_result));
269 }
270
271 let mut subquery_executor =
273 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276 self.build_view_with_context(table, processed_statement, &mut local_context)
278 }
279
280 pub fn execute_with_plan(
282 &self,
283 table: Arc<DataTable>,
284 sql: &str,
285 ) -> Result<(DataView, ExecutionPlan)> {
286 let mut plan_builder = ExecutionPlanBuilder::new();
287 let start_time = Instant::now();
288
289 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291 plan_builder.add_detail(format!("Query: {}", sql));
292 let mut parser = Parser::new(sql);
293 let statement = parser
294 .parse()
295 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296 plan_builder.add_detail(format!("Parsed successfully"));
297 if let Some(from) = &statement.from_table {
298 plan_builder.add_detail(format!("FROM: {}", from));
299 }
300 if statement.where_clause.is_some() {
301 plan_builder.add_detail("WHERE clause present".to_string());
302 }
303 plan_builder.end_step();
304
305 let mut cte_context = HashMap::new();
307
308 if !statement.ctes.is_empty() {
309 plan_builder.begin_step(
310 StepType::CTE,
311 format!("Process {} CTEs", statement.ctes.len()),
312 );
313
314 for cte in &statement.ctes {
315 let cte_start = Instant::now();
316 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318 let cte_result = match &cte.cte_type {
319 CTEType::Standard(query) => {
320 if let Some(from) = &query.from_table {
322 plan_builder.add_detail(format!("Source: {}", from));
323 }
324 if query.where_clause.is_some() {
325 plan_builder.add_detail("Has WHERE clause".to_string());
326 }
327 if query.group_by.is_some() {
328 plan_builder.add_detail("Has GROUP BY".to_string());
329 }
330
331 debug!(
332 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333 cte.name,
334 cte_context.keys().collect::<Vec<_>>()
335 );
336
337 let mut subquery_executor = SubqueryExecutor::with_cte_context(
340 self.clone(),
341 table.clone(),
342 cte_context.clone(),
343 );
344 let processed_query = subquery_executor.execute_subqueries(query)?;
345
346 let view = self.build_view_with_context(
347 table.clone(),
348 processed_query,
349 &mut cte_context,
350 )?;
351
352 let mut materialized = self.materialize_view(view)?;
354
355 for column in materialized.columns_mut() {
357 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358 column.source_table = Some(cte.name.clone());
359 }
360
361 DataView::new(Arc::new(materialized))
362 }
363 CTEType::Web(web_spec) => {
364 plan_builder.add_detail(format!("URL: {}", web_spec.url));
365 if let Some(format) = &web_spec.format {
366 plan_builder.add_detail(format!("Format: {:?}", format));
367 }
368 if let Some(cache) = web_spec.cache_seconds {
369 plan_builder.add_detail(format!("Cache: {} seconds", cache));
370 }
371
372 use crate::web::http_fetcher::WebDataFetcher;
374
375 let fetcher = WebDataFetcher::new()?;
376 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378 for column in data_table.columns_mut() {
380 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381 column.source_table = Some(cte.name.clone());
382 }
383
384 DataView::new(Arc::new(data_table))
386 }
387 };
388
389 plan_builder.set_rows_out(cte_result.row_count());
391 plan_builder.add_detail(format!(
392 "Result: {} rows, {} columns",
393 cte_result.row_count(),
394 cte_result.column_count()
395 ));
396 plan_builder.add_detail(format!(
397 "Execution time: {:.3}ms",
398 cte_start.elapsed().as_secs_f64() * 1000.0
399 ));
400
401 debug!(
402 "QueryEngine: Storing CTE '{}' in context with {} rows",
403 cte.name,
404 cte_result.row_count()
405 );
406 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407 plan_builder.end_step();
408 }
409
410 plan_builder.add_detail(format!(
411 "All {} CTEs cached in context",
412 statement.ctes.len()
413 ));
414 plan_builder.end_step();
415 }
416
417 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419 let mut subquery_executor =
420 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424 format!("{:?}", w).contains("Subquery")
426 });
427
428 if has_subqueries {
429 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430 }
431
432 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434 if has_subqueries {
435 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436 } else {
437 plan_builder.add_detail("No subqueries to process".to_string());
438 }
439
440 plan_builder.end_step();
441 let result = self.build_view_with_context_and_plan(
442 table,
443 processed_statement,
444 &mut cte_context,
445 &mut plan_builder,
446 )?;
447
448 let total_duration = start_time.elapsed();
449 info!(
450 "Query execution complete: total={:?}, rows={}",
451 total_duration,
452 result.row_count()
453 );
454
455 let plan = plan_builder.build();
456 Ok((result, plan))
457 }
458
459 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461 let mut cte_context = HashMap::new();
462 self.build_view_with_context(table, statement, &mut cte_context)
463 }
464
465 fn build_view_with_context(
467 &self,
468 table: Arc<DataTable>,
469 statement: SelectStatement,
470 cte_context: &mut HashMap<String, Arc<DataView>>,
471 ) -> Result<DataView> {
472 let mut dummy_plan = ExecutionPlanBuilder::new();
473 self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474 }
475
476 fn build_view_with_context_and_plan(
478 &self,
479 table: Arc<DataTable>,
480 statement: SelectStatement,
481 cte_context: &mut HashMap<String, Arc<DataView>>,
482 plan: &mut ExecutionPlanBuilder,
483 ) -> Result<DataView> {
484 for cte in &statement.ctes {
486 if cte_context.contains_key(&cte.name) {
488 debug!(
489 "QueryEngine: CTE '{}' already in context, skipping",
490 cte.name
491 );
492 continue;
493 }
494
495 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496 debug!(
497 "QueryEngine: Available CTEs for '{}': {:?}",
498 cte.name,
499 cte_context.keys().collect::<Vec<_>>()
500 );
501
502 let cte_result = match &cte.cte_type {
504 CTEType::Standard(query) => {
505 let view =
506 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508 let mut materialized = self.materialize_view(view)?;
510
511 for column in materialized.columns_mut() {
513 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514 column.source_table = Some(cte.name.clone());
515 }
516
517 DataView::new(Arc::new(materialized))
518 }
519 CTEType::Web(_web_spec) => {
520 return Err(anyhow!(
522 "Web CTEs should be processed in execute_select method"
523 ));
524 }
525 };
526
527 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529 debug!(
530 "QueryEngine: CTE '{}' processed, stored in context",
531 cte.name
532 );
533 }
534
535 let source_table = if let Some(ref table_func) = statement.from_function {
537 debug!("QueryEngine: Processing table function...");
539 match table_func {
540 TableFunction::Generator { name, args } => {
541 use crate::sql::generators::GeneratorRegistry;
543
544 let registry = GeneratorRegistry::new();
546
547 if let Some(generator) = registry.get(name) {
548 let mut evaluator = ArithmeticEvaluator::with_date_notation(
550 &table,
551 self.date_notation.clone(),
552 );
553 let dummy_row = 0;
554
555 let mut evaluated_args = Vec::new();
556 for arg in args {
557 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
558 }
559
560 generator.generate(evaluated_args)?
562 } else {
563 return Err(anyhow!("Unknown generator function: {}", name));
564 }
565 }
566 }
567 } else if let Some(ref subquery) = statement.from_subquery {
568 debug!("QueryEngine: Processing FROM subquery...");
570 let subquery_result =
571 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
572
573 let materialized = self.materialize_view(subquery_result)?;
576 Arc::new(materialized)
577 } else if let Some(ref table_name) = statement.from_table {
578 if let Some(cte_view) = cte_context.get(table_name) {
580 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
581 let mut materialized = self.materialize_view((**cte_view).clone())?;
583
584 if let Some(ref alias) = statement.from_alias {
586 debug!(
587 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
588 alias, table_name
589 );
590 for column in materialized.columns_mut() {
591 if let Some(ref qualified_name) = column.qualified_name {
593 if qualified_name.starts_with(&format!("{}.", table_name)) {
594 column.qualified_name =
595 Some(qualified_name.replace(
596 &format!("{}.", table_name),
597 &format!("{}.", alias),
598 ));
599 }
600 }
601 if column.source_table.as_ref() == Some(table_name) {
603 column.source_table = Some(alias.clone());
604 }
605 }
606 }
607
608 Arc::new(materialized)
609 } else {
610 table.clone()
612 }
613 } else {
614 table.clone()
616 };
617
618 let final_table = if !statement.joins.is_empty() {
620 plan.begin_step(
621 StepType::Join,
622 format!("Process {} JOINs", statement.joins.len()),
623 );
624 plan.set_rows_in(source_table.row_count());
625
626 let join_executor = HashJoinExecutor::new(self.case_insensitive);
627 let mut current_table = source_table;
628
629 for (idx, join_clause) in statement.joins.iter().enumerate() {
630 let join_start = Instant::now();
631 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
632 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
633 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
634 plan.add_detail(format!(
635 "Executing {:?} JOIN on {} condition(s)",
636 join_clause.join_type,
637 join_clause.condition.conditions.len()
638 ));
639
640 let right_table = match &join_clause.table {
642 TableSource::Table(name) => {
643 if let Some(cte_view) = cte_context.get(name) {
645 let mut materialized = self.materialize_view((**cte_view).clone())?;
646
647 if let Some(ref alias) = join_clause.alias {
649 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
650 for column in materialized.columns_mut() {
651 if let Some(ref qualified_name) = column.qualified_name {
653 if qualified_name.starts_with(&format!("{}.", name)) {
654 column.qualified_name = Some(qualified_name.replace(
655 &format!("{}.", name),
656 &format!("{}.", alias),
657 ));
658 }
659 }
660 if column.source_table.as_ref() == Some(name) {
662 column.source_table = Some(alias.clone());
663 }
664 }
665 }
666
667 Arc::new(materialized)
668 } else {
669 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
672 }
673 }
674 TableSource::DerivedTable { query, alias: _ } => {
675 let subquery_result = self.build_view_with_context(
677 table.clone(),
678 *query.clone(),
679 cte_context,
680 )?;
681 let materialized = self.materialize_view(subquery_result)?;
682 Arc::new(materialized)
683 }
684 };
685
686 let joined = join_executor.execute_join(
688 current_table.clone(),
689 join_clause,
690 right_table.clone(),
691 )?;
692
693 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
694 plan.set_rows_out(joined.row_count());
695 plan.add_detail(format!("Result: {} rows", joined.row_count()));
696 plan.add_detail(format!(
697 "Join time: {:.3}ms",
698 join_start.elapsed().as_secs_f64() * 1000.0
699 ));
700 plan.end_step();
701
702 current_table = Arc::new(joined);
703 }
704
705 plan.set_rows_out(current_table.row_count());
706 plan.add_detail(format!(
707 "Final result after all joins: {} rows",
708 current_table.row_count()
709 ));
710 plan.end_step();
711 current_table
712 } else {
713 source_table
714 };
715
716 self.build_view_internal_with_plan(final_table, statement, plan)
718 }
719
720 fn materialize_view(&self, view: DataView) -> Result<DataTable> {
722 let source = view.source();
723 let mut result_table = DataTable::new("derived");
724
725 let visible_cols = view.visible_column_indices().to_vec();
727
728 for col_idx in &visible_cols {
730 let col = &source.columns[*col_idx];
731 let new_col = DataColumn {
732 name: col.name.clone(),
733 data_type: col.data_type.clone(),
734 nullable: col.nullable,
735 unique_values: col.unique_values,
736 null_count: col.null_count,
737 metadata: col.metadata.clone(),
738 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
741 result_table.add_column(new_col);
742 }
743
744 for row_idx in view.visible_row_indices() {
746 let source_row = &source.rows[*row_idx];
747 let mut new_row = DataRow { values: Vec::new() };
748
749 for col_idx in &visible_cols {
750 new_row.values.push(source_row.values[*col_idx].clone());
751 }
752
753 result_table.add_row(new_row);
754 }
755
756 Ok(result_table)
757 }
758
759 fn build_view_internal(
760 &self,
761 table: Arc<DataTable>,
762 statement: SelectStatement,
763 ) -> Result<DataView> {
764 let mut dummy_plan = ExecutionPlanBuilder::new();
765 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
766 }
767
768 fn build_view_internal_with_plan(
769 &self,
770 table: Arc<DataTable>,
771 statement: SelectStatement,
772 plan: &mut ExecutionPlanBuilder,
773 ) -> Result<DataView> {
774 debug!(
775 "QueryEngine::build_view - select_items: {:?}",
776 statement.select_items
777 );
778 debug!(
779 "QueryEngine::build_view - where_clause: {:?}",
780 statement.where_clause
781 );
782
783 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
785
786 if let Some(where_clause) = &statement.where_clause {
788 let total_rows = table.row_count();
789 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
790 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
791
792 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
793 plan.set_rows_in(total_rows);
794 plan.add_detail(format!("Input: {} rows", total_rows));
795
796 for condition in &where_clause.conditions {
798 plan.add_detail(format!("Condition: {:?}", condition.expr));
799 }
800
801 let filter_start = Instant::now();
802 let mut eval_context = EvaluationContext::new(self.case_insensitive);
804
805 let mut filtered_rows = Vec::new();
807 for row_idx in visible_rows {
808 if row_idx < 3 {
810 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
811 }
812 let mut evaluator =
813 RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
814 match evaluator.evaluate(where_clause, row_idx) {
815 Ok(result) => {
816 if row_idx < 3 {
817 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
818 }
819 if result {
820 filtered_rows.push(row_idx);
821 }
822 }
823 Err(e) => {
824 if row_idx < 3 {
825 debug!(
826 "QueryEngine: WHERE evaluation error for row {}: {}",
827 row_idx, e
828 );
829 }
830 return Err(e);
832 }
833 }
834 }
835
836 let (compilations, cache_hits) = eval_context.get_stats();
838 if compilations > 0 || cache_hits > 0 {
839 debug!(
840 "LIKE pattern cache: {} compilations, {} cache hits",
841 compilations, cache_hits
842 );
843 }
844 visible_rows = filtered_rows;
845 let filter_duration = filter_start.elapsed();
846 info!(
847 "WHERE clause filtering: {} rows -> {} rows in {:?}",
848 total_rows,
849 visible_rows.len(),
850 filter_duration
851 );
852
853 plan.set_rows_out(visible_rows.len());
854 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
855 plan.add_detail(format!(
856 "Filter time: {:.3}ms",
857 filter_duration.as_secs_f64() * 1000.0
858 ));
859 plan.end_step();
860 }
861
862 let mut view = DataView::new(table.clone());
864 view = view.with_rows(visible_rows);
865
866 if let Some(group_by_exprs) = &statement.group_by {
868 if !group_by_exprs.is_empty() {
869 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
870
871 plan.begin_step(
872 StepType::GroupBy,
873 format!("GROUP BY {} expressions", group_by_exprs.len()),
874 );
875 plan.set_rows_in(view.row_count());
876 plan.add_detail(format!("Input: {} rows", view.row_count()));
877 for expr in group_by_exprs {
878 plan.add_detail(format!("Group by: {:?}", expr));
879 }
880
881 let group_start = Instant::now();
882 view = self.apply_group_by(
883 view,
884 group_by_exprs,
885 &statement.select_items,
886 statement.having.as_ref(),
887 plan,
888 )?;
889
890 plan.set_rows_out(view.row_count());
891 plan.add_detail(format!("Output: {} groups", view.row_count()));
892 plan.add_detail(format!(
893 "Overall time: {:.3}ms",
894 group_start.elapsed().as_secs_f64() * 1000.0
895 ));
896 plan.end_step();
897 }
898 } else {
899 if !statement.select_items.is_empty() {
901 let has_non_star_items = statement
903 .select_items
904 .iter()
905 .any(|item| !matches!(item, SelectItem::Star));
906
907 if has_non_star_items || statement.select_items.len() > 1 {
911 view = self.apply_select_items(view, &statement.select_items)?;
912 } else {
913 }
914 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
916 debug!("QueryEngine: Using legacy columns path");
917 let source_table = view.source();
920 let column_indices =
921 self.resolve_column_indices(source_table, &statement.columns)?;
922 view = view.with_columns(column_indices);
923 }
924 }
925
926 if statement.distinct {
928 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
929 plan.set_rows_in(view.row_count());
930 plan.add_detail(format!("Input: {} rows", view.row_count()));
931
932 let distinct_start = Instant::now();
933 view = self.apply_distinct(view)?;
934
935 plan.set_rows_out(view.row_count());
936 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
937 plan.add_detail(format!(
938 "Distinct time: {:.3}ms",
939 distinct_start.elapsed().as_secs_f64() * 1000.0
940 ));
941 plan.end_step();
942 }
943
944 if let Some(order_by_columns) = &statement.order_by {
946 if !order_by_columns.is_empty() {
947 plan.begin_step(
948 StepType::Sort,
949 format!("ORDER BY {} columns", order_by_columns.len()),
950 );
951 plan.set_rows_in(view.row_count());
952 for col in order_by_columns {
953 plan.add_detail(format!("{} {:?}", col.column, col.direction));
954 }
955
956 let sort_start = Instant::now();
957 view = self.apply_multi_order_by(view, order_by_columns)?;
958
959 plan.add_detail(format!(
960 "Sort time: {:.3}ms",
961 sort_start.elapsed().as_secs_f64() * 1000.0
962 ));
963 plan.end_step();
964 }
965 }
966
967 if let Some(limit) = statement.limit {
969 let offset = statement.offset.unwrap_or(0);
970 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
971 plan.set_rows_in(view.row_count());
972 if offset > 0 {
973 plan.add_detail(format!("OFFSET: {}", offset));
974 }
975 view = view.with_limit(limit, offset);
976 plan.set_rows_out(view.row_count());
977 plan.add_detail(format!("Output: {} rows", view.row_count()));
978 plan.end_step();
979 }
980
981 Ok(view)
982 }
983
984 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
986 let mut indices = Vec::new();
987 let table_columns = table.column_names();
988
989 for col_name in columns {
990 let index = table_columns
991 .iter()
992 .position(|c| c.eq_ignore_ascii_case(col_name))
993 .ok_or_else(|| {
994 let suggestion = self.find_similar_column(table, col_name);
995 match suggestion {
996 Some(similar) => anyhow::anyhow!(
997 "Column '{}' not found. Did you mean '{}'?",
998 col_name,
999 similar
1000 ),
1001 None => anyhow::anyhow!("Column '{}' not found", col_name),
1002 }
1003 })?;
1004 indices.push(index);
1005 }
1006
1007 Ok(indices)
1008 }
1009
1010 fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1012 debug!(
1013 "QueryEngine::apply_select_items - items: {:?}",
1014 select_items
1015 );
1016 debug!(
1017 "QueryEngine::apply_select_items - input view has {} rows",
1018 view.row_count()
1019 );
1020
1021 let has_aggregates = select_items.iter().any(|item| match item {
1025 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1026 SelectItem::Column(_) => false,
1027 SelectItem::Star => false,
1028 });
1029
1030 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1031 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1032 SelectItem::Column(_) => false, SelectItem::Star => false, });
1035
1036 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1037 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1040 return self.apply_aggregate_select(view, select_items);
1041 }
1042
1043 let has_computed_expressions = select_items
1045 .iter()
1046 .any(|item| matches!(item, SelectItem::Expression { .. }));
1047
1048 debug!(
1049 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1050 has_computed_expressions
1051 );
1052
1053 if !has_computed_expressions {
1054 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1056 return Ok(view.with_columns(column_indices));
1057 }
1058
1059 let source_table = view.source();
1064 let visible_rows = view.visible_row_indices();
1065
1066 let mut computed_table = DataTable::new("query_result");
1069
1070 let mut expanded_items = Vec::new();
1072 for item in select_items {
1073 match item {
1074 SelectItem::Star => {
1075 for col_name in source_table.column_names() {
1077 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1078 col_name.to_string(),
1079 )));
1080 }
1081 }
1082 _ => expanded_items.push(item.clone()),
1083 }
1084 }
1085
1086 let mut column_name_counts: std::collections::HashMap<String, usize> =
1088 std::collections::HashMap::new();
1089
1090 for item in &expanded_items {
1091 let base_name = match item {
1092 SelectItem::Column(col_ref) => col_ref.name.clone(),
1093 SelectItem::Expression { alias, .. } => alias.clone(),
1094 SelectItem::Star => unreachable!("Star should have been expanded"),
1095 };
1096
1097 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1099 let column_name = if *count == 0 {
1100 base_name.clone()
1102 } else {
1103 format!("{base_name}_{count}")
1105 };
1106 *count += 1;
1107
1108 computed_table.add_column(DataColumn::new(&column_name));
1109 }
1110
1111 let mut evaluator =
1113 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1114
1115 for &row_idx in visible_rows {
1116 let mut row_values = Vec::new();
1117
1118 for item in &expanded_items {
1119 let value = match item {
1120 SelectItem::Column(col_ref) => {
1121 let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1123 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1125 let result = source_table.find_column_by_qualified_name(&qualified_name);
1126
1127 if result.is_none() {
1129 let available_qualified: Vec<String> = source_table.columns.iter()
1130 .filter_map(|c| c.qualified_name.clone())
1131 .collect();
1132 let available_simple: Vec<String> = source_table.columns.iter()
1133 .map(|c| c.name.clone())
1134 .collect();
1135 debug!(
1136 "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1137 qualified_name, available_qualified, available_simple
1138 );
1139 }
1141 result
1142 } else {
1143 source_table.get_column_index(&col_ref.name)
1145 }
1146 .ok_or_else(|| {
1147 let display_name = if let Some(prefix) = &col_ref.table_prefix {
1148 format!("{}.{}", prefix, col_ref.name)
1149 } else {
1150 col_ref.name.clone()
1151 };
1152 let suggestion = self.find_similar_column(source_table, &col_ref.name);
1153 match suggestion {
1154 Some(similar) => anyhow::anyhow!(
1155 "Column '{}' not found. Did you mean '{}'?",
1156 display_name,
1157 similar
1158 ),
1159 None => anyhow::anyhow!("Column '{}' not found", display_name),
1160 }
1161 })?;
1162 let row = source_table
1163 .get_row(row_idx)
1164 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1165 row.get(col_idx)
1166 .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1167 .clone()
1168 }
1169 SelectItem::Expression { expr, .. } => {
1170 evaluator.evaluate(expr, row_idx)?
1172 }
1173 SelectItem::Star => unreachable!("Star should have been expanded"),
1174 };
1175 row_values.push(value);
1176 }
1177
1178 computed_table
1179 .add_row(DataRow::new(row_values))
1180 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1181 }
1182
1183 Ok(DataView::new(Arc::new(computed_table)))
1186 }
1187
1188 fn apply_aggregate_select(
1190 &self,
1191 view: DataView,
1192 select_items: &[SelectItem],
1193 ) -> Result<DataView> {
1194 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1195
1196 let source_table = view.source();
1197 let mut result_table = DataTable::new("aggregate_result");
1198
1199 for item in select_items {
1201 let column_name = match item {
1202 SelectItem::Expression { alias, .. } => alias.clone(),
1203 _ => unreachable!("Should only have expressions in aggregate-only query"),
1204 };
1205 result_table.add_column(DataColumn::new(&column_name));
1206 }
1207
1208 let visible_rows = view.visible_row_indices().to_vec();
1210 let mut evaluator =
1211 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1212 .with_visible_rows(visible_rows);
1213
1214 let mut row_values = Vec::new();
1216 for item in select_items {
1217 match item {
1218 SelectItem::Expression { expr, .. } => {
1219 let value = evaluator.evaluate(expr, 0)?;
1222 row_values.push(value);
1223 }
1224 _ => unreachable!("Should only have expressions in aggregate-only query"),
1225 }
1226 }
1227
1228 result_table
1230 .add_row(DataRow::new(row_values))
1231 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1232
1233 Ok(DataView::new(Arc::new(result_table)))
1234 }
1235
1236 fn resolve_select_columns(
1238 &self,
1239 table: &DataTable,
1240 select_items: &[SelectItem],
1241 ) -> Result<Vec<usize>> {
1242 let mut indices = Vec::new();
1243 let table_columns = table.column_names();
1244
1245 for item in select_items {
1246 match item {
1247 SelectItem::Column(col_ref) => {
1248 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1250 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1252 table.find_column_by_qualified_name(&qualified_name)
1253 .ok_or_else(|| {
1254 let has_qualified = table.columns.iter()
1256 .any(|c| c.qualified_name.is_some());
1257 if !has_qualified {
1258 anyhow::anyhow!(
1259 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1260 qualified_name, table_prefix
1261 )
1262 } else {
1263 anyhow::anyhow!("Column '{}' not found", qualified_name)
1264 }
1265 })?
1266 } else {
1267 table_columns
1269 .iter()
1270 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1271 .ok_or_else(|| {
1272 let suggestion = self.find_similar_column(table, &col_ref.name);
1273 match suggestion {
1274 Some(similar) => anyhow::anyhow!(
1275 "Column '{}' not found. Did you mean '{}'?",
1276 col_ref.name,
1277 similar
1278 ),
1279 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1280 }
1281 })?
1282 };
1283 indices.push(index);
1284 }
1285 SelectItem::Star => {
1286 for i in 0..table_columns.len() {
1288 indices.push(i);
1289 }
1290 }
1291 SelectItem::Expression { .. } => {
1292 return Err(anyhow::anyhow!(
1293 "Computed expressions require new table creation"
1294 ));
1295 }
1296 }
1297 }
1298
1299 Ok(indices)
1300 }
1301
1302 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1304 use std::collections::HashSet;
1305
1306 let source = view.source();
1307 let visible_cols = view.visible_column_indices();
1308 let visible_rows = view.visible_row_indices();
1309
1310 let mut seen_rows = HashSet::new();
1312 let mut unique_row_indices = Vec::new();
1313
1314 for &row_idx in visible_rows {
1315 let mut row_key = Vec::new();
1317 for &col_idx in visible_cols {
1318 let value = source
1319 .get_value(row_idx, col_idx)
1320 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1321 row_key.push(format!("{:?}", value));
1323 }
1324
1325 if seen_rows.insert(row_key) {
1327 unique_row_indices.push(row_idx);
1329 }
1330 }
1331
1332 Ok(view.with_rows(unique_row_indices))
1334 }
1335
1336 fn apply_multi_order_by(
1338 &self,
1339 mut view: DataView,
1340 order_by_columns: &[OrderByColumn],
1341 ) -> Result<DataView> {
1342 let mut sort_columns = Vec::new();
1344
1345 for order_col in order_by_columns {
1346 let col_index = view
1350 .source()
1351 .get_column_index(&order_col.column)
1352 .ok_or_else(|| {
1353 let suggestion = self.find_similar_column(view.source(), &order_col.column);
1355 match suggestion {
1356 Some(similar) => anyhow::anyhow!(
1357 "Column '{}' not found. Did you mean '{}'?",
1358 order_col.column,
1359 similar
1360 ),
1361 None => {
1362 let available_cols = view.source().column_names().join(", ");
1364 anyhow::anyhow!(
1365 "Column '{}' not found. Available columns: {}",
1366 order_col.column,
1367 available_cols
1368 )
1369 }
1370 }
1371 })?;
1372
1373 let ascending = matches!(order_col.direction, SortDirection::Asc);
1374 sort_columns.push((col_index, ascending));
1375 }
1376
1377 view.apply_multi_sort(&sort_columns)?;
1379 Ok(view)
1380 }
1381
1382 fn apply_group_by(
1384 &self,
1385 view: DataView,
1386 group_by_exprs: &[SqlExpression],
1387 select_items: &[SelectItem],
1388 having: Option<&SqlExpression>,
1389 plan: &mut ExecutionPlanBuilder,
1390 ) -> Result<DataView> {
1391 let (result_view, phase_info) = self.apply_group_by_expressions(
1393 view,
1394 group_by_exprs,
1395 select_items,
1396 having,
1397 self.case_insensitive,
1398 self.date_notation.clone(),
1399 )?;
1400
1401 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1403 plan.add_detail(format!(
1404 "Phase 1 - Group Building: {:.3}ms",
1405 phase_info.phase2_key_building.as_secs_f64() * 1000.0
1406 ));
1407 plan.add_detail(format!(
1408 " • Processing {} rows into {} groups",
1409 phase_info.total_rows, phase_info.num_groups
1410 ));
1411 plan.add_detail(format!(
1412 "Phase 2 - Aggregation: {:.3}ms",
1413 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
1414 ));
1415 if phase_info.phase4_having_evaluation > Duration::ZERO {
1416 plan.add_detail(format!(
1417 "Phase 3 - HAVING Filter: {:.3}ms",
1418 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
1419 ));
1420 plan.add_detail(format!(
1421 " • Filtered {} groups",
1422 phase_info.groups_filtered_by_having
1423 ));
1424 }
1425 plan.add_detail(format!(
1426 "Total GROUP BY time: {:.3}ms",
1427 phase_info.total_time.as_secs_f64() * 1000.0
1428 ));
1429
1430 Ok(result_view)
1431 }
1432
1433 pub fn estimate_group_cardinality(
1436 &self,
1437 view: &DataView,
1438 group_by_exprs: &[SqlExpression],
1439 ) -> usize {
1440 let row_count = view.get_visible_rows().len();
1442 if row_count <= 100 {
1443 return row_count;
1444 }
1445
1446 let sample_size = min(1000, row_count / 10).max(100);
1448 let mut seen = FxHashSet::default();
1449
1450 let visible_rows = view.get_visible_rows();
1451 for (i, &row_idx) in visible_rows.iter().enumerate() {
1452 if i >= sample_size {
1453 break;
1454 }
1455
1456 let mut key_values = Vec::new();
1458 for expr in group_by_exprs {
1459 let mut evaluator = ArithmeticEvaluator::new(view.source());
1460 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1461 key_values.push(value);
1462 }
1463
1464 seen.insert(key_values);
1465 }
1466
1467 let sample_cardinality = seen.len();
1469 let estimated = (sample_cardinality * row_count) / sample_size;
1470
1471 estimated.min(row_count).max(sample_cardinality)
1473 }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478 use super::*;
1479 use crate::data::datatable::{DataColumn, DataRow, DataValue};
1480
1481 fn create_test_table() -> Arc<DataTable> {
1482 let mut table = DataTable::new("test");
1483
1484 table.add_column(DataColumn::new("id"));
1486 table.add_column(DataColumn::new("name"));
1487 table.add_column(DataColumn::new("age"));
1488
1489 table
1491 .add_row(DataRow::new(vec![
1492 DataValue::Integer(1),
1493 DataValue::String("Alice".to_string()),
1494 DataValue::Integer(30),
1495 ]))
1496 .unwrap();
1497
1498 table
1499 .add_row(DataRow::new(vec![
1500 DataValue::Integer(2),
1501 DataValue::String("Bob".to_string()),
1502 DataValue::Integer(25),
1503 ]))
1504 .unwrap();
1505
1506 table
1507 .add_row(DataRow::new(vec![
1508 DataValue::Integer(3),
1509 DataValue::String("Charlie".to_string()),
1510 DataValue::Integer(35),
1511 ]))
1512 .unwrap();
1513
1514 Arc::new(table)
1515 }
1516
1517 #[test]
1518 fn test_select_all() {
1519 let table = create_test_table();
1520 let engine = QueryEngine::new();
1521
1522 let view = engine
1523 .execute(table.clone(), "SELECT * FROM users")
1524 .unwrap();
1525 assert_eq!(view.row_count(), 3);
1526 assert_eq!(view.column_count(), 3);
1527 }
1528
1529 #[test]
1530 fn test_select_columns() {
1531 let table = create_test_table();
1532 let engine = QueryEngine::new();
1533
1534 let view = engine
1535 .execute(table.clone(), "SELECT name, age FROM users")
1536 .unwrap();
1537 assert_eq!(view.row_count(), 3);
1538 assert_eq!(view.column_count(), 2);
1539 }
1540
1541 #[test]
1542 fn test_select_with_limit() {
1543 let table = create_test_table();
1544 let engine = QueryEngine::new();
1545
1546 let view = engine
1547 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1548 .unwrap();
1549 assert_eq!(view.row_count(), 2);
1550 }
1551
1552 #[test]
1553 fn test_type_coercion_contains() {
1554 let _ = tracing_subscriber::fmt()
1556 .with_max_level(tracing::Level::DEBUG)
1557 .try_init();
1558
1559 let mut table = DataTable::new("test");
1560 table.add_column(DataColumn::new("id"));
1561 table.add_column(DataColumn::new("status"));
1562 table.add_column(DataColumn::new("price"));
1563
1564 table
1566 .add_row(DataRow::new(vec![
1567 DataValue::Integer(1),
1568 DataValue::String("Pending".to_string()),
1569 DataValue::Float(99.99),
1570 ]))
1571 .unwrap();
1572
1573 table
1574 .add_row(DataRow::new(vec![
1575 DataValue::Integer(2),
1576 DataValue::String("Confirmed".to_string()),
1577 DataValue::Float(150.50),
1578 ]))
1579 .unwrap();
1580
1581 table
1582 .add_row(DataRow::new(vec![
1583 DataValue::Integer(3),
1584 DataValue::String("Pending".to_string()),
1585 DataValue::Float(75.00),
1586 ]))
1587 .unwrap();
1588
1589 let table = Arc::new(table);
1590 let engine = QueryEngine::new();
1591
1592 println!("\n=== Testing WHERE clause with Contains ===");
1593 println!("Table has {} rows", table.row_count());
1594 for i in 0..table.row_count() {
1595 let status = table.get_value(i, 1);
1596 println!("Row {i}: status = {status:?}");
1597 }
1598
1599 println!("\n--- Test 1: status.Contains('pend') ---");
1601 let result = engine.execute(
1602 table.clone(),
1603 "SELECT * FROM test WHERE status.Contains('pend')",
1604 );
1605 match result {
1606 Ok(view) => {
1607 println!("SUCCESS: Found {} matching rows", view.row_count());
1608 assert_eq!(view.row_count(), 2); }
1610 Err(e) => {
1611 panic!("Query failed: {e}");
1612 }
1613 }
1614
1615 println!("\n--- Test 2: price.Contains('9') ---");
1617 let result = engine.execute(
1618 table.clone(),
1619 "SELECT * FROM test WHERE price.Contains('9')",
1620 );
1621 match result {
1622 Ok(view) => {
1623 println!(
1624 "SUCCESS: Found {} matching rows with price containing '9'",
1625 view.row_count()
1626 );
1627 assert!(view.row_count() >= 1);
1629 }
1630 Err(e) => {
1631 panic!("Numeric coercion query failed: {e}");
1632 }
1633 }
1634
1635 println!("\n=== All tests passed! ===");
1636 }
1637
1638 #[test]
1639 fn test_not_in_clause() {
1640 let _ = tracing_subscriber::fmt()
1642 .with_max_level(tracing::Level::DEBUG)
1643 .try_init();
1644
1645 let mut table = DataTable::new("test");
1646 table.add_column(DataColumn::new("id"));
1647 table.add_column(DataColumn::new("country"));
1648
1649 table
1651 .add_row(DataRow::new(vec![
1652 DataValue::Integer(1),
1653 DataValue::String("CA".to_string()),
1654 ]))
1655 .unwrap();
1656
1657 table
1658 .add_row(DataRow::new(vec![
1659 DataValue::Integer(2),
1660 DataValue::String("US".to_string()),
1661 ]))
1662 .unwrap();
1663
1664 table
1665 .add_row(DataRow::new(vec![
1666 DataValue::Integer(3),
1667 DataValue::String("UK".to_string()),
1668 ]))
1669 .unwrap();
1670
1671 let table = Arc::new(table);
1672 let engine = QueryEngine::new();
1673
1674 println!("\n=== Testing NOT IN clause ===");
1675 println!("Table has {} rows", table.row_count());
1676 for i in 0..table.row_count() {
1677 let country = table.get_value(i, 1);
1678 println!("Row {i}: country = {country:?}");
1679 }
1680
1681 println!("\n--- Test: country NOT IN ('CA') ---");
1683 let result = engine.execute(
1684 table.clone(),
1685 "SELECT * FROM test WHERE country NOT IN ('CA')",
1686 );
1687 match result {
1688 Ok(view) => {
1689 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1690 assert_eq!(view.row_count(), 2); }
1692 Err(e) => {
1693 panic!("NOT IN query failed: {e}");
1694 }
1695 }
1696
1697 println!("\n=== NOT IN test complete! ===");
1698 }
1699
1700 #[test]
1701 fn test_case_insensitive_in_and_not_in() {
1702 let _ = tracing_subscriber::fmt()
1704 .with_max_level(tracing::Level::DEBUG)
1705 .try_init();
1706
1707 let mut table = DataTable::new("test");
1708 table.add_column(DataColumn::new("id"));
1709 table.add_column(DataColumn::new("country"));
1710
1711 table
1713 .add_row(DataRow::new(vec![
1714 DataValue::Integer(1),
1715 DataValue::String("CA".to_string()), ]))
1717 .unwrap();
1718
1719 table
1720 .add_row(DataRow::new(vec![
1721 DataValue::Integer(2),
1722 DataValue::String("us".to_string()), ]))
1724 .unwrap();
1725
1726 table
1727 .add_row(DataRow::new(vec![
1728 DataValue::Integer(3),
1729 DataValue::String("UK".to_string()), ]))
1731 .unwrap();
1732
1733 let table = Arc::new(table);
1734
1735 println!("\n=== Testing Case-Insensitive IN clause ===");
1736 println!("Table has {} rows", table.row_count());
1737 for i in 0..table.row_count() {
1738 let country = table.get_value(i, 1);
1739 println!("Row {i}: country = {country:?}");
1740 }
1741
1742 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1744 let engine = QueryEngine::with_case_insensitive(true);
1745 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1746 match result {
1747 Ok(view) => {
1748 println!(
1749 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1750 view.row_count()
1751 );
1752 assert_eq!(view.row_count(), 1); }
1754 Err(e) => {
1755 panic!("Case-insensitive IN query failed: {e}");
1756 }
1757 }
1758
1759 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1761 let result = engine.execute(
1762 table.clone(),
1763 "SELECT * FROM test WHERE country NOT IN ('ca')",
1764 );
1765 match result {
1766 Ok(view) => {
1767 println!(
1768 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1769 view.row_count()
1770 );
1771 assert_eq!(view.row_count(), 2); }
1773 Err(e) => {
1774 panic!("Case-insensitive NOT IN query failed: {e}");
1775 }
1776 }
1777
1778 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1780 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
1782 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1783 match result {
1784 Ok(view) => {
1785 println!(
1786 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1787 view.row_count()
1788 );
1789 assert_eq!(view.row_count(), 0); }
1791 Err(e) => {
1792 panic!("Case-sensitive IN query failed: {e}");
1793 }
1794 }
1795
1796 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1797 }
1798
1799 #[test]
1800 #[ignore = "Parentheses in WHERE clause not yet implemented"]
1801 fn test_parentheses_in_where_clause() {
1802 let _ = tracing_subscriber::fmt()
1804 .with_max_level(tracing::Level::DEBUG)
1805 .try_init();
1806
1807 let mut table = DataTable::new("test");
1808 table.add_column(DataColumn::new("id"));
1809 table.add_column(DataColumn::new("status"));
1810 table.add_column(DataColumn::new("priority"));
1811
1812 table
1814 .add_row(DataRow::new(vec![
1815 DataValue::Integer(1),
1816 DataValue::String("Pending".to_string()),
1817 DataValue::String("High".to_string()),
1818 ]))
1819 .unwrap();
1820
1821 table
1822 .add_row(DataRow::new(vec![
1823 DataValue::Integer(2),
1824 DataValue::String("Complete".to_string()),
1825 DataValue::String("High".to_string()),
1826 ]))
1827 .unwrap();
1828
1829 table
1830 .add_row(DataRow::new(vec![
1831 DataValue::Integer(3),
1832 DataValue::String("Pending".to_string()),
1833 DataValue::String("Low".to_string()),
1834 ]))
1835 .unwrap();
1836
1837 table
1838 .add_row(DataRow::new(vec![
1839 DataValue::Integer(4),
1840 DataValue::String("Complete".to_string()),
1841 DataValue::String("Low".to_string()),
1842 ]))
1843 .unwrap();
1844
1845 let table = Arc::new(table);
1846 let engine = QueryEngine::new();
1847
1848 println!("\n=== Testing Parentheses in WHERE clause ===");
1849 println!("Table has {} rows", table.row_count());
1850 for i in 0..table.row_count() {
1851 let status = table.get_value(i, 1);
1852 let priority = table.get_value(i, 2);
1853 println!("Row {i}: status = {status:?}, priority = {priority:?}");
1854 }
1855
1856 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1858 let result = engine.execute(
1859 table.clone(),
1860 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1861 );
1862 match result {
1863 Ok(view) => {
1864 println!(
1865 "SUCCESS: Found {} rows with parenthetical logic",
1866 view.row_count()
1867 );
1868 assert_eq!(view.row_count(), 2); }
1870 Err(e) => {
1871 panic!("Parentheses query failed: {e}");
1872 }
1873 }
1874
1875 println!("\n=== Parentheses test complete! ===");
1876 }
1877
1878 #[test]
1879 #[ignore = "Numeric type coercion needs fixing"]
1880 fn test_numeric_type_coercion() {
1881 let _ = tracing_subscriber::fmt()
1883 .with_max_level(tracing::Level::DEBUG)
1884 .try_init();
1885
1886 let mut table = DataTable::new("test");
1887 table.add_column(DataColumn::new("id"));
1888 table.add_column(DataColumn::new("price"));
1889 table.add_column(DataColumn::new("quantity"));
1890
1891 table
1893 .add_row(DataRow::new(vec![
1894 DataValue::Integer(1),
1895 DataValue::Float(99.50), DataValue::Integer(100),
1897 ]))
1898 .unwrap();
1899
1900 table
1901 .add_row(DataRow::new(vec![
1902 DataValue::Integer(2),
1903 DataValue::Float(150.0), DataValue::Integer(200),
1905 ]))
1906 .unwrap();
1907
1908 table
1909 .add_row(DataRow::new(vec![
1910 DataValue::Integer(3),
1911 DataValue::Integer(75), DataValue::Integer(50),
1913 ]))
1914 .unwrap();
1915
1916 let table = Arc::new(table);
1917 let engine = QueryEngine::new();
1918
1919 println!("\n=== Testing Numeric Type Coercion ===");
1920 println!("Table has {} rows", table.row_count());
1921 for i in 0..table.row_count() {
1922 let price = table.get_value(i, 1);
1923 let quantity = table.get_value(i, 2);
1924 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1925 }
1926
1927 println!("\n--- Test: price.Contains('.') ---");
1929 let result = engine.execute(
1930 table.clone(),
1931 "SELECT * FROM test WHERE price.Contains('.')",
1932 );
1933 match result {
1934 Ok(view) => {
1935 println!(
1936 "SUCCESS: Found {} rows with decimal points in price",
1937 view.row_count()
1938 );
1939 assert_eq!(view.row_count(), 2); }
1941 Err(e) => {
1942 panic!("Numeric Contains query failed: {e}");
1943 }
1944 }
1945
1946 println!("\n--- Test: quantity.Contains('0') ---");
1948 let result = engine.execute(
1949 table.clone(),
1950 "SELECT * FROM test WHERE quantity.Contains('0')",
1951 );
1952 match result {
1953 Ok(view) => {
1954 println!(
1955 "SUCCESS: Found {} rows with '0' in quantity",
1956 view.row_count()
1957 );
1958 assert_eq!(view.row_count(), 2); }
1960 Err(e) => {
1961 panic!("Integer Contains query failed: {e}");
1962 }
1963 }
1964
1965 println!("\n=== Numeric type coercion test complete! ===");
1966 }
1967
1968 #[test]
1969 fn test_datetime_comparisons() {
1970 let _ = tracing_subscriber::fmt()
1972 .with_max_level(tracing::Level::DEBUG)
1973 .try_init();
1974
1975 let mut table = DataTable::new("test");
1976 table.add_column(DataColumn::new("id"));
1977 table.add_column(DataColumn::new("created_date"));
1978
1979 table
1981 .add_row(DataRow::new(vec![
1982 DataValue::Integer(1),
1983 DataValue::String("2024-12-15".to_string()),
1984 ]))
1985 .unwrap();
1986
1987 table
1988 .add_row(DataRow::new(vec![
1989 DataValue::Integer(2),
1990 DataValue::String("2025-01-15".to_string()),
1991 ]))
1992 .unwrap();
1993
1994 table
1995 .add_row(DataRow::new(vec![
1996 DataValue::Integer(3),
1997 DataValue::String("2025-02-15".to_string()),
1998 ]))
1999 .unwrap();
2000
2001 let table = Arc::new(table);
2002 let engine = QueryEngine::new();
2003
2004 println!("\n=== Testing DateTime Comparisons ===");
2005 println!("Table has {} rows", table.row_count());
2006 for i in 0..table.row_count() {
2007 let date = table.get_value(i, 1);
2008 println!("Row {i}: created_date = {date:?}");
2009 }
2010
2011 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2013 let result = engine.execute(
2014 table.clone(),
2015 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2016 );
2017 match result {
2018 Ok(view) => {
2019 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2020 assert_eq!(view.row_count(), 2); }
2022 Err(e) => {
2023 panic!("DateTime comparison query failed: {e}");
2024 }
2025 }
2026
2027 println!("\n=== DateTime comparison test complete! ===");
2028 }
2029
2030 #[test]
2031 fn test_not_with_method_calls() {
2032 let _ = tracing_subscriber::fmt()
2034 .with_max_level(tracing::Level::DEBUG)
2035 .try_init();
2036
2037 let mut table = DataTable::new("test");
2038 table.add_column(DataColumn::new("id"));
2039 table.add_column(DataColumn::new("status"));
2040
2041 table
2043 .add_row(DataRow::new(vec![
2044 DataValue::Integer(1),
2045 DataValue::String("Pending Review".to_string()),
2046 ]))
2047 .unwrap();
2048
2049 table
2050 .add_row(DataRow::new(vec![
2051 DataValue::Integer(2),
2052 DataValue::String("Complete".to_string()),
2053 ]))
2054 .unwrap();
2055
2056 table
2057 .add_row(DataRow::new(vec![
2058 DataValue::Integer(3),
2059 DataValue::String("Pending Approval".to_string()),
2060 ]))
2061 .unwrap();
2062
2063 let table = Arc::new(table);
2064 let engine = QueryEngine::with_case_insensitive(true);
2065
2066 println!("\n=== Testing NOT with Method Calls ===");
2067 println!("Table has {} rows", table.row_count());
2068 for i in 0..table.row_count() {
2069 let status = table.get_value(i, 1);
2070 println!("Row {i}: status = {status:?}");
2071 }
2072
2073 println!("\n--- Test: NOT status.Contains('pend') ---");
2075 let result = engine.execute(
2076 table.clone(),
2077 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2078 );
2079 match result {
2080 Ok(view) => {
2081 println!(
2082 "SUCCESS: Found {} rows NOT containing 'pend'",
2083 view.row_count()
2084 );
2085 assert_eq!(view.row_count(), 1); }
2087 Err(e) => {
2088 panic!("NOT Contains query failed: {e}");
2089 }
2090 }
2091
2092 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2094 let result = engine.execute(
2095 table.clone(),
2096 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2097 );
2098 match result {
2099 Ok(view) => {
2100 println!(
2101 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2102 view.row_count()
2103 );
2104 assert_eq!(view.row_count(), 1); }
2106 Err(e) => {
2107 panic!("NOT StartsWith query failed: {e}");
2108 }
2109 }
2110
2111 println!("\n=== NOT with method calls test complete! ===");
2112 }
2113
2114 #[test]
2115 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2116 fn test_complex_logical_expressions() {
2117 let _ = tracing_subscriber::fmt()
2119 .with_max_level(tracing::Level::DEBUG)
2120 .try_init();
2121
2122 let mut table = DataTable::new("test");
2123 table.add_column(DataColumn::new("id"));
2124 table.add_column(DataColumn::new("status"));
2125 table.add_column(DataColumn::new("priority"));
2126 table.add_column(DataColumn::new("assigned"));
2127
2128 table
2130 .add_row(DataRow::new(vec![
2131 DataValue::Integer(1),
2132 DataValue::String("Pending".to_string()),
2133 DataValue::String("High".to_string()),
2134 DataValue::String("John".to_string()),
2135 ]))
2136 .unwrap();
2137
2138 table
2139 .add_row(DataRow::new(vec![
2140 DataValue::Integer(2),
2141 DataValue::String("Complete".to_string()),
2142 DataValue::String("High".to_string()),
2143 DataValue::String("Jane".to_string()),
2144 ]))
2145 .unwrap();
2146
2147 table
2148 .add_row(DataRow::new(vec![
2149 DataValue::Integer(3),
2150 DataValue::String("Pending".to_string()),
2151 DataValue::String("Low".to_string()),
2152 DataValue::String("John".to_string()),
2153 ]))
2154 .unwrap();
2155
2156 table
2157 .add_row(DataRow::new(vec![
2158 DataValue::Integer(4),
2159 DataValue::String("In Progress".to_string()),
2160 DataValue::String("Medium".to_string()),
2161 DataValue::String("Jane".to_string()),
2162 ]))
2163 .unwrap();
2164
2165 let table = Arc::new(table);
2166 let engine = QueryEngine::new();
2167
2168 println!("\n=== Testing Complex Logical Expressions ===");
2169 println!("Table has {} rows", table.row_count());
2170 for i in 0..table.row_count() {
2171 let status = table.get_value(i, 1);
2172 let priority = table.get_value(i, 2);
2173 let assigned = table.get_value(i, 3);
2174 println!(
2175 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2176 );
2177 }
2178
2179 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2181 let result = engine.execute(
2182 table.clone(),
2183 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2184 );
2185 match result {
2186 Ok(view) => {
2187 println!(
2188 "SUCCESS: Found {} rows with complex logic",
2189 view.row_count()
2190 );
2191 assert_eq!(view.row_count(), 2); }
2193 Err(e) => {
2194 panic!("Complex logic query failed: {e}");
2195 }
2196 }
2197
2198 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2200 let result = engine.execute(
2201 table.clone(),
2202 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2203 );
2204 match result {
2205 Ok(view) => {
2206 println!(
2207 "SUCCESS: Found {} rows with NOT complex logic",
2208 view.row_count()
2209 );
2210 assert_eq!(view.row_count(), 2); }
2212 Err(e) => {
2213 panic!("NOT complex logic query failed: {e}");
2214 }
2215 }
2216
2217 println!("\n=== Complex logical expressions test complete! ===");
2218 }
2219
2220 #[test]
2221 fn test_mixed_data_types_and_edge_cases() {
2222 let _ = tracing_subscriber::fmt()
2224 .with_max_level(tracing::Level::DEBUG)
2225 .try_init();
2226
2227 let mut table = DataTable::new("test");
2228 table.add_column(DataColumn::new("id"));
2229 table.add_column(DataColumn::new("value"));
2230 table.add_column(DataColumn::new("nullable_field"));
2231
2232 table
2234 .add_row(DataRow::new(vec![
2235 DataValue::Integer(1),
2236 DataValue::String("123.45".to_string()),
2237 DataValue::String("present".to_string()),
2238 ]))
2239 .unwrap();
2240
2241 table
2242 .add_row(DataRow::new(vec![
2243 DataValue::Integer(2),
2244 DataValue::Float(678.90),
2245 DataValue::Null,
2246 ]))
2247 .unwrap();
2248
2249 table
2250 .add_row(DataRow::new(vec![
2251 DataValue::Integer(3),
2252 DataValue::Boolean(true),
2253 DataValue::String("also present".to_string()),
2254 ]))
2255 .unwrap();
2256
2257 table
2258 .add_row(DataRow::new(vec![
2259 DataValue::Integer(4),
2260 DataValue::String("false".to_string()),
2261 DataValue::Null,
2262 ]))
2263 .unwrap();
2264
2265 let table = Arc::new(table);
2266 let engine = QueryEngine::new();
2267
2268 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2269 println!("Table has {} rows", table.row_count());
2270 for i in 0..table.row_count() {
2271 let value = table.get_value(i, 1);
2272 let nullable = table.get_value(i, 2);
2273 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2274 }
2275
2276 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2278 let result = engine.execute(
2279 table.clone(),
2280 "SELECT * FROM test WHERE value.Contains('true')",
2281 );
2282 match result {
2283 Ok(view) => {
2284 println!(
2285 "SUCCESS: Found {} rows with boolean coercion",
2286 view.row_count()
2287 );
2288 assert_eq!(view.row_count(), 1); }
2290 Err(e) => {
2291 panic!("Boolean coercion query failed: {e}");
2292 }
2293 }
2294
2295 println!("\n--- Test: id IN (1, 3) ---");
2297 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2298 match result {
2299 Ok(view) => {
2300 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2301 assert_eq!(view.row_count(), 2); }
2303 Err(e) => {
2304 panic!("Multiple IN values query failed: {e}");
2305 }
2306 }
2307
2308 println!("\n=== Mixed data types test complete! ===");
2309 }
2310
2311 #[test]
2313 fn test_aggregate_only_single_row() {
2314 let table = create_test_stock_data();
2315 let engine = QueryEngine::new();
2316
2317 let result = engine
2319 .execute(
2320 table.clone(),
2321 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2322 )
2323 .expect("Query should succeed");
2324
2325 assert_eq!(
2326 result.row_count(),
2327 1,
2328 "Aggregate-only query should return exactly 1 row"
2329 );
2330 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2331
2332 let source = result.source();
2334 let row = source.get_row(0).expect("Should have first row");
2335
2336 assert_eq!(row.values[0], DataValue::Integer(5));
2338
2339 assert_eq!(row.values[1], DataValue::Float(99.5));
2341
2342 assert_eq!(row.values[2], DataValue::Float(105.0));
2344
2345 if let DataValue::Float(avg) = &row.values[3] {
2347 assert!(
2348 (avg - 102.4).abs() < 0.01,
2349 "Average should be approximately 102.4, got {}",
2350 avg
2351 );
2352 } else {
2353 panic!("AVG should return a Float value");
2354 }
2355 }
2356
2357 #[test]
2359 fn test_single_aggregate_single_row() {
2360 let table = create_test_stock_data();
2361 let engine = QueryEngine::new();
2362
2363 let result = engine
2364 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2365 .expect("Query should succeed");
2366
2367 assert_eq!(
2368 result.row_count(),
2369 1,
2370 "Single aggregate query should return exactly 1 row"
2371 );
2372 assert_eq!(result.column_count(), 1, "Should have 1 column");
2373
2374 let source = result.source();
2375 let row = source.get_row(0).expect("Should have first row");
2376 assert_eq!(row.values[0], DataValue::Integer(5));
2377 }
2378
2379 #[test]
2381 fn test_aggregate_with_where_single_row() {
2382 let table = create_test_stock_data();
2383 let engine = QueryEngine::new();
2384
2385 let result = engine
2387 .execute(
2388 table.clone(),
2389 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2390 )
2391 .expect("Query should succeed");
2392
2393 assert_eq!(
2394 result.row_count(),
2395 1,
2396 "Filtered aggregate query should return exactly 1 row"
2397 );
2398 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2399
2400 let source = result.source();
2401 let row = source.get_row(0).expect("Should have first row");
2402
2403 assert_eq!(row.values[0], DataValue::Integer(2));
2405 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
2408
2409 #[test]
2410 fn test_not_in_parsing() {
2411 use crate::sql::recursive_parser::Parser;
2412
2413 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2414 println!("\n=== Testing NOT IN parsing ===");
2415 println!("Parsing query: {query}");
2416
2417 let mut parser = Parser::new(query);
2418 match parser.parse() {
2419 Ok(statement) => {
2420 println!("Parsed statement: {statement:#?}");
2421 if let Some(where_clause) = statement.where_clause {
2422 println!("WHERE conditions: {:#?}", where_clause.conditions);
2423 if let Some(first_condition) = where_clause.conditions.first() {
2424 println!("First condition expression: {:#?}", first_condition.expr);
2425 }
2426 }
2427 }
2428 Err(e) => {
2429 panic!("Parse error: {e}");
2430 }
2431 }
2432 }
2433
2434 fn create_test_stock_data() -> Arc<DataTable> {
2436 let mut table = DataTable::new("stock");
2437
2438 table.add_column(DataColumn::new("symbol"));
2439 table.add_column(DataColumn::new("close"));
2440 table.add_column(DataColumn::new("volume"));
2441
2442 let test_data = vec![
2444 ("AAPL", 99.5, 1000),
2445 ("AAPL", 101.2, 1500),
2446 ("AAPL", 103.5, 2000),
2447 ("AAPL", 105.0, 1200),
2448 ("AAPL", 102.8, 1800),
2449 ];
2450
2451 for (symbol, close, volume) in test_data {
2452 table
2453 .add_row(DataRow::new(vec![
2454 DataValue::String(symbol.to_string()),
2455 DataValue::Float(close),
2456 DataValue::Integer(volume),
2457 ]))
2458 .expect("Should add row successfully");
2459 }
2460
2461 Arc::new(table)
2462 }
2463}
2464
2465#[cfg(test)]
2466#[path = "query_engine_tests.rs"]
2467mod query_engine_tests;