1use anyhow::{anyhow, Result};
2use fxhash::{FxHashMap, FxHashSet};
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::subquery_executor::SubqueryExecutor;
19use crate::data::virtual_table_generator::VirtualTableGenerator;
20use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
21use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
22use crate::sql::parser::ast::ColumnRef;
23use crate::sql::parser::ast::TableSource;
24use crate::sql::recursive_parser::{
25 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
26 TableFunction,
27};
28
29#[derive(Clone)]
31pub struct QueryEngine {
32 case_insensitive: bool,
33 date_notation: String,
34 behavior_config: Option<BehaviorConfig>,
35}
36
37impl Default for QueryEngine {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl QueryEngine {
44 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 case_insensitive: false,
48 date_notation: get_date_notation(),
49 behavior_config: None,
50 }
51 }
52
53 #[must_use]
54 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
55 let case_insensitive = config.case_insensitive_default;
56 let date_notation = get_date_notation();
58 Self {
59 case_insensitive,
60 date_notation,
61 behavior_config: Some(config),
62 }
63 }
64
65 #[must_use]
66 pub fn with_date_notation(_date_notation: String) -> Self {
67 Self {
68 case_insensitive: false,
69 date_notation: get_date_notation(), behavior_config: None,
71 }
72 }
73
74 #[must_use]
75 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
76 Self {
77 case_insensitive,
78 date_notation: get_date_notation(),
79 behavior_config: None,
80 }
81 }
82
83 #[must_use]
84 pub fn with_case_insensitive_and_date_notation(
85 case_insensitive: bool,
86 _date_notation: String, ) -> Self {
88 Self {
89 case_insensitive,
90 date_notation: get_date_notation(), behavior_config: None,
92 }
93 }
94
95 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
97 let columns = table.column_names();
98 let mut best_match: Option<(String, usize)> = None;
99
100 for col in columns {
101 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
102 let max_distance = if name.len() > 10 { 3 } else { 2 };
105 if distance <= max_distance {
106 match &best_match {
107 None => best_match = Some((col, distance)),
108 Some((_, best_dist)) if distance < *best_dist => {
109 best_match = Some((col, distance));
110 }
111 _ => {}
112 }
113 }
114 }
115
116 best_match.map(|(name, _)| name)
117 }
118
119 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
121 let len1 = s1.len();
122 let len2 = s2.len();
123 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
124
125 for i in 0..=len1 {
126 matrix[i][0] = i;
127 }
128 for j in 0..=len2 {
129 matrix[0][j] = j;
130 }
131
132 for (i, c1) in s1.chars().enumerate() {
133 for (j, c2) in s2.chars().enumerate() {
134 let cost = usize::from(c1 != c2);
135 matrix[i + 1][j + 1] = std::cmp::min(
136 matrix[i][j + 1] + 1, std::cmp::min(
138 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
141 );
142 }
143 }
144
145 matrix[len1][len2]
146 }
147
148 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
150 let (view, _plan) = self.execute_with_plan(table, sql)?;
151 Ok(view)
152 }
153
154 pub fn execute_statement(
156 &self,
157 table: Arc<DataTable>,
158 statement: SelectStatement,
159 ) -> Result<DataView> {
160 let mut cte_context = HashMap::new();
162 for cte in &statement.ctes {
163 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
164 let cte_result = match &cte.cte_type {
166 CTEType::Standard(query) => {
167 let view = self.build_view_with_context(
169 table.clone(),
170 query.clone(),
171 &mut cte_context,
172 )?;
173
174 let mut materialized = self.materialize_view(view)?;
176
177 for column in materialized.columns_mut() {
179 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
180 column.source_table = Some(cte.name.clone());
181 }
182
183 DataView::new(Arc::new(materialized))
184 }
185 CTEType::Web(web_spec) => {
186 use crate::web::http_fetcher::WebDataFetcher;
188
189 let fetcher = WebDataFetcher::new()?;
190 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
191
192 for column in data_table.columns_mut() {
194 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
195 column.source_table = Some(cte.name.clone());
196 }
197
198 DataView::new(Arc::new(data_table))
200 }
201 };
202 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
204 debug!(
205 "QueryEngine: CTE '{}' pre-processed, stored in context",
206 cte.name
207 );
208 }
209
210 let mut subquery_executor =
212 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
213 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
214
215 self.build_view_with_context(table, processed_statement, &mut cte_context)
217 }
218
219 pub fn execute_statement_with_cte_context(
221 &self,
222 table: Arc<DataTable>,
223 statement: SelectStatement,
224 cte_context: &HashMap<String, Arc<DataView>>,
225 ) -> Result<DataView> {
226 let mut local_context = cte_context.clone();
228
229 for cte in &statement.ctes {
231 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
232 let cte_result = match &cte.cte_type {
233 CTEType::Standard(query) => {
234 let view = self.build_view_with_context(
235 table.clone(),
236 query.clone(),
237 &mut local_context,
238 )?;
239
240 let mut materialized = self.materialize_view(view)?;
242
243 for column in materialized.columns_mut() {
245 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
246 column.source_table = Some(cte.name.clone());
247 }
248
249 DataView::new(Arc::new(materialized))
250 }
251 CTEType::Web(web_spec) => {
252 use crate::web::http_fetcher::WebDataFetcher;
254
255 let fetcher = WebDataFetcher::new()?;
256 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
257
258 for column in data_table.columns_mut() {
260 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
261 column.source_table = Some(cte.name.clone());
262 }
263
264 DataView::new(Arc::new(data_table))
266 }
267 };
268 local_context.insert(cte.name.clone(), Arc::new(cte_result));
269 }
270
271 let mut subquery_executor =
273 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
274 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
275
276 self.build_view_with_context(table, processed_statement, &mut local_context)
278 }
279
280 pub fn execute_with_plan(
282 &self,
283 table: Arc<DataTable>,
284 sql: &str,
285 ) -> Result<(DataView, ExecutionPlan)> {
286 let mut plan_builder = ExecutionPlanBuilder::new();
287 let start_time = Instant::now();
288
289 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
291 plan_builder.add_detail(format!("Query: {}", sql));
292 let mut parser = Parser::new(sql);
293 let statement = parser
294 .parse()
295 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
296 plan_builder.add_detail(format!("Parsed successfully"));
297 if let Some(from) = &statement.from_table {
298 plan_builder.add_detail(format!("FROM: {}", from));
299 }
300 if statement.where_clause.is_some() {
301 plan_builder.add_detail("WHERE clause present".to_string());
302 }
303 plan_builder.end_step();
304
305 let mut cte_context = HashMap::new();
307
308 if !statement.ctes.is_empty() {
309 plan_builder.begin_step(
310 StepType::CTE,
311 format!("Process {} CTEs", statement.ctes.len()),
312 );
313
314 for cte in &statement.ctes {
315 let cte_start = Instant::now();
316 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
317
318 let cte_result = match &cte.cte_type {
319 CTEType::Standard(query) => {
320 if let Some(from) = &query.from_table {
322 plan_builder.add_detail(format!("Source: {}", from));
323 }
324 if query.where_clause.is_some() {
325 plan_builder.add_detail("Has WHERE clause".to_string());
326 }
327 if query.group_by.is_some() {
328 plan_builder.add_detail("Has GROUP BY".to_string());
329 }
330
331 debug!(
332 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
333 cte.name,
334 cte_context.keys().collect::<Vec<_>>()
335 );
336
337 let mut subquery_executor = SubqueryExecutor::with_cte_context(
340 self.clone(),
341 table.clone(),
342 cte_context.clone(),
343 );
344 let processed_query = subquery_executor.execute_subqueries(query)?;
345
346 let view = self.build_view_with_context(
347 table.clone(),
348 processed_query,
349 &mut cte_context,
350 )?;
351
352 let mut materialized = self.materialize_view(view)?;
354
355 for column in materialized.columns_mut() {
357 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
358 column.source_table = Some(cte.name.clone());
359 }
360
361 DataView::new(Arc::new(materialized))
362 }
363 CTEType::Web(web_spec) => {
364 plan_builder.add_detail(format!("URL: {}", web_spec.url));
365 if let Some(format) = &web_spec.format {
366 plan_builder.add_detail(format!("Format: {:?}", format));
367 }
368 if let Some(cache) = web_spec.cache_seconds {
369 plan_builder.add_detail(format!("Cache: {} seconds", cache));
370 }
371
372 use crate::web::http_fetcher::WebDataFetcher;
374
375 let fetcher = WebDataFetcher::new()?;
376 let mut data_table = fetcher.fetch(web_spec, &cte.name)?;
377
378 for column in data_table.columns_mut() {
380 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
381 column.source_table = Some(cte.name.clone());
382 }
383
384 DataView::new(Arc::new(data_table))
386 }
387 };
388
389 plan_builder.set_rows_out(cte_result.row_count());
391 plan_builder.add_detail(format!(
392 "Result: {} rows, {} columns",
393 cte_result.row_count(),
394 cte_result.column_count()
395 ));
396 plan_builder.add_detail(format!(
397 "Execution time: {:.3}ms",
398 cte_start.elapsed().as_secs_f64() * 1000.0
399 ));
400
401 debug!(
402 "QueryEngine: Storing CTE '{}' in context with {} rows",
403 cte.name,
404 cte_result.row_count()
405 );
406 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
407 plan_builder.end_step();
408 }
409
410 plan_builder.add_detail(format!(
411 "All {} CTEs cached in context",
412 statement.ctes.len()
413 ));
414 plan_builder.end_step();
415 }
416
417 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
419 let mut subquery_executor =
420 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
421
422 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
424 format!("{:?}", w).contains("Subquery")
426 });
427
428 if has_subqueries {
429 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
430 }
431
432 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
433
434 if has_subqueries {
435 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
436 } else {
437 plan_builder.add_detail("No subqueries to process".to_string());
438 }
439
440 plan_builder.end_step();
441 let result = self.build_view_with_context_and_plan(
442 table,
443 processed_statement,
444 &mut cte_context,
445 &mut plan_builder,
446 )?;
447
448 let total_duration = start_time.elapsed();
449 info!(
450 "Query execution complete: total={:?}, rows={}",
451 total_duration,
452 result.row_count()
453 );
454
455 let plan = plan_builder.build();
456 Ok((result, plan))
457 }
458
459 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
461 let mut cte_context = HashMap::new();
462 self.build_view_with_context(table, statement, &mut cte_context)
463 }
464
465 fn build_view_with_context(
467 &self,
468 table: Arc<DataTable>,
469 statement: SelectStatement,
470 cte_context: &mut HashMap<String, Arc<DataView>>,
471 ) -> Result<DataView> {
472 let mut dummy_plan = ExecutionPlanBuilder::new();
473 self.build_view_with_context_and_plan(table, statement, cte_context, &mut dummy_plan)
474 }
475
476 fn build_view_with_context_and_plan(
478 &self,
479 table: Arc<DataTable>,
480 statement: SelectStatement,
481 cte_context: &mut HashMap<String, Arc<DataView>>,
482 plan: &mut ExecutionPlanBuilder,
483 ) -> Result<DataView> {
484 for cte in &statement.ctes {
486 if cte_context.contains_key(&cte.name) {
488 debug!(
489 "QueryEngine: CTE '{}' already in context, skipping",
490 cte.name
491 );
492 continue;
493 }
494
495 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
496 debug!(
497 "QueryEngine: Available CTEs for '{}': {:?}",
498 cte.name,
499 cte_context.keys().collect::<Vec<_>>()
500 );
501
502 let cte_result = match &cte.cte_type {
504 CTEType::Standard(query) => {
505 let view =
506 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
507
508 let mut materialized = self.materialize_view(view)?;
510
511 for column in materialized.columns_mut() {
513 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
514 column.source_table = Some(cte.name.clone());
515 }
516
517 DataView::new(Arc::new(materialized))
518 }
519 CTEType::Web(_web_spec) => {
520 return Err(anyhow!(
522 "Web CTEs should be processed in execute_select method"
523 ));
524 }
525 };
526
527 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
529 debug!(
530 "QueryEngine: CTE '{}' processed, stored in context",
531 cte.name
532 );
533 }
534
535 let source_table = if let Some(ref table_func) = statement.from_function {
537 debug!("QueryEngine: Processing table function...");
539 match table_func {
540 TableFunction::Range { start, end, step } => {
541 let mut evaluator =
543 ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
544
545 let dummy_row = 0;
547
548 let start_val = evaluator.evaluate(start, dummy_row)?;
549 let end_val = evaluator.evaluate(end, dummy_row)?;
550 let step_val = if let Some(step_expr) = step {
551 Some(evaluator.evaluate(step_expr, dummy_row)?)
552 } else {
553 None
554 };
555
556 let start_int = match start_val {
558 DataValue::Integer(i) => i,
559 DataValue::Float(f) => f as i64,
560 _ => return Err(anyhow!("RANGE start must be numeric")),
561 };
562
563 let end_int = match end_val {
564 DataValue::Integer(i) => i,
565 DataValue::Float(f) => f as i64,
566 _ => return Err(anyhow!("RANGE end must be numeric")),
567 };
568
569 let step_int = if let Some(step) = step_val {
570 match step {
571 DataValue::Integer(i) => Some(i),
572 DataValue::Float(f) => Some(f as i64),
573 _ => return Err(anyhow!("RANGE step must be numeric")),
574 }
575 } else {
576 None
577 };
578
579 VirtualTableGenerator::generate_range(start_int, end_int, step_int, None)?
581 }
582 TableFunction::Split { text, delimiter } => {
583 let mut evaluator =
585 ArithmeticEvaluator::with_date_notation(&table, self.date_notation.clone());
586
587 let dummy_row = 0;
589
590 let text_val = evaluator.evaluate(text, dummy_row)?;
591 let delimiter_val = if let Some(delim_expr) = delimiter {
592 Some(evaluator.evaluate(delim_expr, dummy_row)?)
593 } else {
594 None
595 };
596
597 let text_str = match text_val {
599 DataValue::String(s) => s,
600 DataValue::Null => return Err(anyhow!("SPLIT text cannot be NULL")),
601 _ => text_val.to_string(),
602 };
603
604 let delimiter_str = if let Some(delim) = delimiter_val {
605 match delim {
606 DataValue::String(s) => Some(s),
607 DataValue::Null => None,
608 _ => Some(delim.to_string()),
609 }
610 } else {
611 None
612 };
613
614 VirtualTableGenerator::generate_split(
616 &text_str,
617 delimiter_str.as_deref(),
618 None,
619 )?
620 }
621 TableFunction::Generator { name, args } => {
622 use crate::sql::generators::GeneratorRegistry;
624
625 let registry = GeneratorRegistry::new();
627
628 if let Some(generator) = registry.get(name) {
629 let mut evaluator = ArithmeticEvaluator::with_date_notation(
631 &table,
632 self.date_notation.clone(),
633 );
634 let dummy_row = 0;
635
636 let mut evaluated_args = Vec::new();
637 for arg in args {
638 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
639 }
640
641 generator.generate(evaluated_args)?
643 } else {
644 return Err(anyhow!("Unknown generator function: {}", name));
645 }
646 }
647 }
648 } else if let Some(ref subquery) = statement.from_subquery {
649 debug!("QueryEngine: Processing FROM subquery...");
651 let subquery_result =
652 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
653
654 let materialized = self.materialize_view(subquery_result)?;
657 Arc::new(materialized)
658 } else if let Some(ref table_name) = statement.from_table {
659 if let Some(cte_view) = cte_context.get(table_name) {
661 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
662 let materialized = self.materialize_view((**cte_view).clone())?;
664 Arc::new(materialized)
665 } else {
666 table.clone()
668 }
669 } else {
670 table.clone()
672 };
673
674 let final_table = if !statement.joins.is_empty() {
676 plan.begin_step(
677 StepType::Join,
678 format!("Process {} JOINs", statement.joins.len()),
679 );
680 plan.set_rows_in(source_table.row_count());
681
682 let join_executor = HashJoinExecutor::new(self.case_insensitive);
683 let mut current_table = source_table;
684
685 for (idx, join_clause) in statement.joins.iter().enumerate() {
686 let join_start = Instant::now();
687 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
688 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
689 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
690 plan.add_detail(format!(
691 "Executing {:?} JOIN on {}",
692 join_clause.join_type, join_clause.condition.left_column
693 ));
694
695 let right_table = match &join_clause.table {
697 TableSource::Table(name) => {
698 if let Some(cte_view) = cte_context.get(name) {
700 let materialized = self.materialize_view((**cte_view).clone())?;
701 Arc::new(materialized)
702 } else {
703 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
706 }
707 }
708 TableSource::DerivedTable { query, alias: _ } => {
709 let subquery_result = self.build_view_with_context(
711 table.clone(),
712 *query.clone(),
713 cte_context,
714 )?;
715 let materialized = self.materialize_view(subquery_result)?;
716 Arc::new(materialized)
717 }
718 };
719
720 let joined = join_executor.execute_join(
722 current_table.clone(),
723 join_clause,
724 right_table.clone(),
725 )?;
726
727 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
728 plan.set_rows_out(joined.row_count());
729 plan.add_detail(format!("Result: {} rows", joined.row_count()));
730 plan.add_detail(format!(
731 "Join time: {:.3}ms",
732 join_start.elapsed().as_secs_f64() * 1000.0
733 ));
734 plan.end_step();
735
736 current_table = Arc::new(joined);
737 }
738
739 plan.set_rows_out(current_table.row_count());
740 plan.add_detail(format!(
741 "Final result after all joins: {} rows",
742 current_table.row_count()
743 ));
744 plan.end_step();
745 current_table
746 } else {
747 source_table
748 };
749
750 self.build_view_internal_with_plan(final_table, statement, plan)
752 }
753
754 fn materialize_view(&self, view: DataView) -> Result<DataTable> {
756 let source = view.source();
757 let mut result_table = DataTable::new("derived");
758
759 let visible_cols = view.visible_column_indices().to_vec();
761
762 for col_idx in &visible_cols {
764 let col = &source.columns[*col_idx];
765 let new_col = DataColumn {
766 name: col.name.clone(),
767 data_type: col.data_type.clone(),
768 nullable: col.nullable,
769 unique_values: col.unique_values,
770 null_count: col.null_count,
771 metadata: col.metadata.clone(),
772 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
775 result_table.add_column(new_col);
776 }
777
778 for row_idx in view.visible_row_indices() {
780 let source_row = &source.rows[*row_idx];
781 let mut new_row = DataRow { values: Vec::new() };
782
783 for col_idx in &visible_cols {
784 new_row.values.push(source_row.values[*col_idx].clone());
785 }
786
787 result_table.add_row(new_row);
788 }
789
790 Ok(result_table)
791 }
792
793 fn build_view_internal(
794 &self,
795 table: Arc<DataTable>,
796 statement: SelectStatement,
797 ) -> Result<DataView> {
798 let mut dummy_plan = ExecutionPlanBuilder::new();
799 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
800 }
801
802 fn build_view_internal_with_plan(
803 &self,
804 table: Arc<DataTable>,
805 statement: SelectStatement,
806 plan: &mut ExecutionPlanBuilder,
807 ) -> Result<DataView> {
808 debug!(
809 "QueryEngine::build_view - select_items: {:?}",
810 statement.select_items
811 );
812 debug!(
813 "QueryEngine::build_view - where_clause: {:?}",
814 statement.where_clause
815 );
816
817 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
819
820 if let Some(where_clause) = &statement.where_clause {
822 let total_rows = table.row_count();
823 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
824 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
825
826 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
827 plan.set_rows_in(total_rows);
828 plan.add_detail(format!("Input: {} rows", total_rows));
829
830 for condition in &where_clause.conditions {
832 plan.add_detail(format!("Condition: {:?}", condition.expr));
833 }
834
835 let filter_start = Instant::now();
836 let mut eval_context = EvaluationContext::new(self.case_insensitive);
838
839 let mut filtered_rows = Vec::new();
841 for row_idx in visible_rows {
842 if row_idx < 3 {
844 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
845 }
846 let mut evaluator =
847 RecursiveWhereEvaluator::with_context(&table, &mut eval_context);
848 match evaluator.evaluate(where_clause, row_idx) {
849 Ok(result) => {
850 if row_idx < 3 {
851 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
852 }
853 if result {
854 filtered_rows.push(row_idx);
855 }
856 }
857 Err(e) => {
858 if row_idx < 3 {
859 debug!(
860 "QueryEngine: WHERE evaluation error for row {}: {}",
861 row_idx, e
862 );
863 }
864 return Err(e);
866 }
867 }
868 }
869
870 let (compilations, cache_hits) = eval_context.get_stats();
872 if compilations > 0 || cache_hits > 0 {
873 debug!(
874 "LIKE pattern cache: {} compilations, {} cache hits",
875 compilations, cache_hits
876 );
877 }
878 visible_rows = filtered_rows;
879 let filter_duration = filter_start.elapsed();
880 info!(
881 "WHERE clause filtering: {} rows -> {} rows in {:?}",
882 total_rows,
883 visible_rows.len(),
884 filter_duration
885 );
886
887 plan.set_rows_out(visible_rows.len());
888 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
889 plan.add_detail(format!(
890 "Filter time: {:.3}ms",
891 filter_duration.as_secs_f64() * 1000.0
892 ));
893 plan.end_step();
894 }
895
896 let mut view = DataView::new(table.clone());
898 view = view.with_rows(visible_rows);
899
900 if let Some(group_by_exprs) = &statement.group_by {
902 if !group_by_exprs.is_empty() {
903 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
904
905 plan.begin_step(
906 StepType::GroupBy,
907 format!("GROUP BY {} expressions", group_by_exprs.len()),
908 );
909 plan.set_rows_in(view.row_count());
910 plan.add_detail(format!("Input: {} rows", view.row_count()));
911 for expr in group_by_exprs {
912 plan.add_detail(format!("Group by: {:?}", expr));
913 }
914
915 let group_start = Instant::now();
916 view = self.apply_group_by(
917 view,
918 group_by_exprs,
919 &statement.select_items,
920 statement.having.as_ref(),
921 )?;
922
923 plan.set_rows_out(view.row_count());
924 plan.add_detail(format!("Output: {} groups", view.row_count()));
925 plan.add_detail(format!(
926 "Group time: {:.3}ms",
927 group_start.elapsed().as_secs_f64() * 1000.0
928 ));
929 plan.end_step();
930 }
931 } else {
932 if !statement.select_items.is_empty() {
934 let has_non_star_items = statement
936 .select_items
937 .iter()
938 .any(|item| !matches!(item, SelectItem::Star));
939
940 if has_non_star_items || statement.select_items.len() > 1 {
944 view = self.apply_select_items(view, &statement.select_items)?;
945 } else {
946 }
947 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
949 debug!("QueryEngine: Using legacy columns path");
950 let source_table = view.source();
953 let column_indices =
954 self.resolve_column_indices(source_table, &statement.columns)?;
955 view = view.with_columns(column_indices);
956 }
957 }
958
959 if statement.distinct {
961 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
962 plan.set_rows_in(view.row_count());
963 plan.add_detail(format!("Input: {} rows", view.row_count()));
964
965 let distinct_start = Instant::now();
966 view = self.apply_distinct(view)?;
967
968 plan.set_rows_out(view.row_count());
969 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
970 plan.add_detail(format!(
971 "Distinct time: {:.3}ms",
972 distinct_start.elapsed().as_secs_f64() * 1000.0
973 ));
974 plan.end_step();
975 }
976
977 if let Some(order_by_columns) = &statement.order_by {
979 if !order_by_columns.is_empty() {
980 plan.begin_step(
981 StepType::Sort,
982 format!("ORDER BY {} columns", order_by_columns.len()),
983 );
984 plan.set_rows_in(view.row_count());
985 for col in order_by_columns {
986 plan.add_detail(format!("{} {:?}", col.column, col.direction));
987 }
988
989 let sort_start = Instant::now();
990 view = self.apply_multi_order_by(view, order_by_columns)?;
991
992 plan.add_detail(format!(
993 "Sort time: {:.3}ms",
994 sort_start.elapsed().as_secs_f64() * 1000.0
995 ));
996 plan.end_step();
997 }
998 }
999
1000 if let Some(limit) = statement.limit {
1002 let offset = statement.offset.unwrap_or(0);
1003 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1004 plan.set_rows_in(view.row_count());
1005 if offset > 0 {
1006 plan.add_detail(format!("OFFSET: {}", offset));
1007 }
1008 view = view.with_limit(limit, offset);
1009 plan.set_rows_out(view.row_count());
1010 plan.add_detail(format!("Output: {} rows", view.row_count()));
1011 plan.end_step();
1012 }
1013
1014 Ok(view)
1015 }
1016
1017 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1019 let mut indices = Vec::new();
1020 let table_columns = table.column_names();
1021
1022 for col_name in columns {
1023 let index = table_columns
1024 .iter()
1025 .position(|c| c.eq_ignore_ascii_case(col_name))
1026 .ok_or_else(|| {
1027 let suggestion = self.find_similar_column(table, col_name);
1028 match suggestion {
1029 Some(similar) => anyhow::anyhow!(
1030 "Column '{}' not found. Did you mean '{}'?",
1031 col_name,
1032 similar
1033 ),
1034 None => anyhow::anyhow!("Column '{}' not found", col_name),
1035 }
1036 })?;
1037 indices.push(index);
1038 }
1039
1040 Ok(indices)
1041 }
1042
1043 fn apply_select_items(&self, view: DataView, select_items: &[SelectItem]) -> Result<DataView> {
1045 debug!(
1046 "QueryEngine::apply_select_items - items: {:?}",
1047 select_items
1048 );
1049 debug!(
1050 "QueryEngine::apply_select_items - input view has {} rows",
1051 view.row_count()
1052 );
1053
1054 let has_aggregates = select_items.iter().any(|item| match item {
1058 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1059 SelectItem::Column(_) => false,
1060 SelectItem::Star => false,
1061 });
1062
1063 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1064 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1065 SelectItem::Column(_) => false, SelectItem::Star => false, });
1068
1069 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1070 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1073 return self.apply_aggregate_select(view, select_items);
1074 }
1075
1076 let has_computed_expressions = select_items
1078 .iter()
1079 .any(|item| matches!(item, SelectItem::Expression { .. }));
1080
1081 debug!(
1082 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1083 has_computed_expressions
1084 );
1085
1086 if !has_computed_expressions {
1087 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1089 return Ok(view.with_columns(column_indices));
1090 }
1091
1092 let source_table = view.source();
1097 let visible_rows = view.visible_row_indices();
1098
1099 let mut computed_table = DataTable::new("query_result");
1102
1103 let mut expanded_items = Vec::new();
1105 for item in select_items {
1106 match item {
1107 SelectItem::Star => {
1108 for col_name in source_table.column_names() {
1110 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1111 col_name.to_string(),
1112 )));
1113 }
1114 }
1115 _ => expanded_items.push(item.clone()),
1116 }
1117 }
1118
1119 let mut column_name_counts: std::collections::HashMap<String, usize> =
1121 std::collections::HashMap::new();
1122
1123 for item in &expanded_items {
1124 let base_name = match item {
1125 SelectItem::Column(col_ref) => col_ref.name.clone(),
1126 SelectItem::Expression { alias, .. } => alias.clone(),
1127 SelectItem::Star => unreachable!("Star should have been expanded"),
1128 };
1129
1130 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1132 let column_name = if *count == 0 {
1133 base_name.clone()
1135 } else {
1136 format!("{base_name}_{count}")
1138 };
1139 *count += 1;
1140
1141 computed_table.add_column(DataColumn::new(&column_name));
1142 }
1143
1144 let mut evaluator =
1146 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1147
1148 for &row_idx in visible_rows {
1149 let mut row_values = Vec::new();
1150
1151 for item in &expanded_items {
1152 let value = match item {
1153 SelectItem::Column(col_ref) => {
1154 let col_idx = if let Some(table_prefix) = &col_ref.table_prefix {
1156 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1158 let result = source_table.find_column_by_qualified_name(&qualified_name);
1159
1160 if result.is_none() {
1162 let available_qualified: Vec<String> = source_table.columns.iter()
1163 .filter_map(|c| c.qualified_name.clone())
1164 .collect();
1165 let available_simple: Vec<String> = source_table.columns.iter()
1166 .map(|c| c.name.clone())
1167 .collect();
1168 debug!(
1169 "Qualified column '{}' not found. Available qualified names: {:?}, Simple names: {:?}",
1170 qualified_name, available_qualified, available_simple
1171 );
1172 }
1174 result
1175 } else {
1176 source_table.get_column_index(&col_ref.name)
1178 }
1179 .ok_or_else(|| {
1180 let display_name = if let Some(prefix) = &col_ref.table_prefix {
1181 format!("{}.{}", prefix, col_ref.name)
1182 } else {
1183 col_ref.name.clone()
1184 };
1185 let suggestion = self.find_similar_column(source_table, &col_ref.name);
1186 match suggestion {
1187 Some(similar) => anyhow::anyhow!(
1188 "Column '{}' not found. Did you mean '{}'?",
1189 display_name,
1190 similar
1191 ),
1192 None => anyhow::anyhow!("Column '{}' not found", display_name),
1193 }
1194 })?;
1195 let row = source_table
1196 .get_row(row_idx)
1197 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1198 row.get(col_idx)
1199 .ok_or_else(|| anyhow::anyhow!("Column {} not found in row", col_idx))?
1200 .clone()
1201 }
1202 SelectItem::Expression { expr, .. } => {
1203 evaluator.evaluate(expr, row_idx)?
1205 }
1206 SelectItem::Star => unreachable!("Star should have been expanded"),
1207 };
1208 row_values.push(value);
1209 }
1210
1211 computed_table
1212 .add_row(DataRow::new(row_values))
1213 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1214 }
1215
1216 Ok(DataView::new(Arc::new(computed_table)))
1219 }
1220
1221 fn apply_aggregate_select(
1223 &self,
1224 view: DataView,
1225 select_items: &[SelectItem],
1226 ) -> Result<DataView> {
1227 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1228
1229 let source_table = view.source();
1230 let mut result_table = DataTable::new("aggregate_result");
1231
1232 for item in select_items {
1234 let column_name = match item {
1235 SelectItem::Expression { alias, .. } => alias.clone(),
1236 _ => unreachable!("Should only have expressions in aggregate-only query"),
1237 };
1238 result_table.add_column(DataColumn::new(&column_name));
1239 }
1240
1241 let visible_rows = view.visible_row_indices().to_vec();
1243 let mut evaluator =
1244 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1245 .with_visible_rows(visible_rows);
1246
1247 let mut row_values = Vec::new();
1249 for item in select_items {
1250 match item {
1251 SelectItem::Expression { expr, .. } => {
1252 let value = evaluator.evaluate(expr, 0)?;
1255 row_values.push(value);
1256 }
1257 _ => unreachable!("Should only have expressions in aggregate-only query"),
1258 }
1259 }
1260
1261 result_table
1263 .add_row(DataRow::new(row_values))
1264 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1265
1266 Ok(DataView::new(Arc::new(result_table)))
1267 }
1268
1269 fn resolve_select_columns(
1271 &self,
1272 table: &DataTable,
1273 select_items: &[SelectItem],
1274 ) -> Result<Vec<usize>> {
1275 let mut indices = Vec::new();
1276 let table_columns = table.column_names();
1277
1278 for item in select_items {
1279 match item {
1280 SelectItem::Column(col_ref) => {
1281 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1283 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1285 table.find_column_by_qualified_name(&qualified_name)
1286 .ok_or_else(|| {
1287 let has_qualified = table.columns.iter()
1289 .any(|c| c.qualified_name.is_some());
1290 if !has_qualified {
1291 anyhow::anyhow!(
1292 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1293 qualified_name, table_prefix
1294 )
1295 } else {
1296 anyhow::anyhow!("Column '{}' not found", qualified_name)
1297 }
1298 })?
1299 } else {
1300 table_columns
1302 .iter()
1303 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1304 .ok_or_else(|| {
1305 let suggestion = self.find_similar_column(table, &col_ref.name);
1306 match suggestion {
1307 Some(similar) => anyhow::anyhow!(
1308 "Column '{}' not found. Did you mean '{}'?",
1309 col_ref.name,
1310 similar
1311 ),
1312 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1313 }
1314 })?
1315 };
1316 indices.push(index);
1317 }
1318 SelectItem::Star => {
1319 for i in 0..table_columns.len() {
1321 indices.push(i);
1322 }
1323 }
1324 SelectItem::Expression { .. } => {
1325 return Err(anyhow::anyhow!(
1326 "Computed expressions require new table creation"
1327 ));
1328 }
1329 }
1330 }
1331
1332 Ok(indices)
1333 }
1334
1335 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1337 use std::collections::HashSet;
1338
1339 let source = view.source();
1340 let visible_cols = view.visible_column_indices();
1341 let visible_rows = view.visible_row_indices();
1342
1343 let mut seen_rows = HashSet::new();
1345 let mut unique_row_indices = Vec::new();
1346
1347 for &row_idx in visible_rows {
1348 let mut row_key = Vec::new();
1350 for &col_idx in visible_cols {
1351 let value = source
1352 .get_value(row_idx, col_idx)
1353 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1354 row_key.push(format!("{:?}", value));
1356 }
1357
1358 if seen_rows.insert(row_key) {
1360 unique_row_indices.push(row_idx);
1362 }
1363 }
1364
1365 Ok(view.with_rows(unique_row_indices))
1367 }
1368
1369 fn apply_multi_order_by(
1371 &self,
1372 mut view: DataView,
1373 order_by_columns: &[OrderByColumn],
1374 ) -> Result<DataView> {
1375 let mut sort_columns = Vec::new();
1377
1378 for order_col in order_by_columns {
1379 let col_index = view
1383 .source()
1384 .get_column_index(&order_col.column)
1385 .ok_or_else(|| {
1386 let suggestion = self.find_similar_column(view.source(), &order_col.column);
1388 match suggestion {
1389 Some(similar) => anyhow::anyhow!(
1390 "Column '{}' not found. Did you mean '{}'?",
1391 order_col.column,
1392 similar
1393 ),
1394 None => {
1395 let available_cols = view.source().column_names().join(", ");
1397 anyhow::anyhow!(
1398 "Column '{}' not found. Available columns: {}",
1399 order_col.column,
1400 available_cols
1401 )
1402 }
1403 }
1404 })?;
1405
1406 let ascending = matches!(order_col.direction, SortDirection::Asc);
1407 sort_columns.push((col_index, ascending));
1408 }
1409
1410 view.apply_multi_sort(&sort_columns)?;
1412 Ok(view)
1413 }
1414
1415 fn apply_group_by(
1417 &self,
1418 view: DataView,
1419 group_by_exprs: &[SqlExpression],
1420 select_items: &[SelectItem],
1421 having: Option<&SqlExpression>,
1422 ) -> Result<DataView> {
1423 self.apply_group_by_expressions(
1425 view,
1426 group_by_exprs,
1427 select_items,
1428 having,
1429 self.case_insensitive,
1430 self.date_notation.clone(),
1431 )
1432 }
1433
1434 pub fn estimate_group_cardinality(
1437 &self,
1438 view: &DataView,
1439 group_by_exprs: &[SqlExpression],
1440 ) -> usize {
1441 let row_count = view.get_visible_rows().len();
1443 if row_count <= 100 {
1444 return row_count;
1445 }
1446
1447 let sample_size = min(1000, row_count / 10).max(100);
1449 let mut seen = FxHashSet::default();
1450
1451 let visible_rows = view.get_visible_rows();
1452 for (i, &row_idx) in visible_rows.iter().enumerate() {
1453 if i >= sample_size {
1454 break;
1455 }
1456
1457 let mut key_values = Vec::new();
1459 for expr in group_by_exprs {
1460 let mut evaluator = ArithmeticEvaluator::new(view.source());
1461 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
1462 key_values.push(value);
1463 }
1464
1465 seen.insert(key_values);
1466 }
1467
1468 let sample_cardinality = seen.len();
1470 let estimated = (sample_cardinality * row_count) / sample_size;
1471
1472 estimated.min(row_count).max(sample_cardinality)
1474 }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479 use super::*;
1480 use crate::data::datatable::{DataColumn, DataRow, DataValue};
1481
1482 fn create_test_table() -> Arc<DataTable> {
1483 let mut table = DataTable::new("test");
1484
1485 table.add_column(DataColumn::new("id"));
1487 table.add_column(DataColumn::new("name"));
1488 table.add_column(DataColumn::new("age"));
1489
1490 table
1492 .add_row(DataRow::new(vec![
1493 DataValue::Integer(1),
1494 DataValue::String("Alice".to_string()),
1495 DataValue::Integer(30),
1496 ]))
1497 .unwrap();
1498
1499 table
1500 .add_row(DataRow::new(vec![
1501 DataValue::Integer(2),
1502 DataValue::String("Bob".to_string()),
1503 DataValue::Integer(25),
1504 ]))
1505 .unwrap();
1506
1507 table
1508 .add_row(DataRow::new(vec![
1509 DataValue::Integer(3),
1510 DataValue::String("Charlie".to_string()),
1511 DataValue::Integer(35),
1512 ]))
1513 .unwrap();
1514
1515 Arc::new(table)
1516 }
1517
1518 #[test]
1519 fn test_select_all() {
1520 let table = create_test_table();
1521 let engine = QueryEngine::new();
1522
1523 let view = engine
1524 .execute(table.clone(), "SELECT * FROM users")
1525 .unwrap();
1526 assert_eq!(view.row_count(), 3);
1527 assert_eq!(view.column_count(), 3);
1528 }
1529
1530 #[test]
1531 fn test_select_columns() {
1532 let table = create_test_table();
1533 let engine = QueryEngine::new();
1534
1535 let view = engine
1536 .execute(table.clone(), "SELECT name, age FROM users")
1537 .unwrap();
1538 assert_eq!(view.row_count(), 3);
1539 assert_eq!(view.column_count(), 2);
1540 }
1541
1542 #[test]
1543 fn test_select_with_limit() {
1544 let table = create_test_table();
1545 let engine = QueryEngine::new();
1546
1547 let view = engine
1548 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
1549 .unwrap();
1550 assert_eq!(view.row_count(), 2);
1551 }
1552
1553 #[test]
1554 fn test_type_coercion_contains() {
1555 let _ = tracing_subscriber::fmt()
1557 .with_max_level(tracing::Level::DEBUG)
1558 .try_init();
1559
1560 let mut table = DataTable::new("test");
1561 table.add_column(DataColumn::new("id"));
1562 table.add_column(DataColumn::new("status"));
1563 table.add_column(DataColumn::new("price"));
1564
1565 table
1567 .add_row(DataRow::new(vec![
1568 DataValue::Integer(1),
1569 DataValue::String("Pending".to_string()),
1570 DataValue::Float(99.99),
1571 ]))
1572 .unwrap();
1573
1574 table
1575 .add_row(DataRow::new(vec![
1576 DataValue::Integer(2),
1577 DataValue::String("Confirmed".to_string()),
1578 DataValue::Float(150.50),
1579 ]))
1580 .unwrap();
1581
1582 table
1583 .add_row(DataRow::new(vec![
1584 DataValue::Integer(3),
1585 DataValue::String("Pending".to_string()),
1586 DataValue::Float(75.00),
1587 ]))
1588 .unwrap();
1589
1590 let table = Arc::new(table);
1591 let engine = QueryEngine::new();
1592
1593 println!("\n=== Testing WHERE clause with Contains ===");
1594 println!("Table has {} rows", table.row_count());
1595 for i in 0..table.row_count() {
1596 let status = table.get_value(i, 1);
1597 println!("Row {i}: status = {status:?}");
1598 }
1599
1600 println!("\n--- Test 1: status.Contains('pend') ---");
1602 let result = engine.execute(
1603 table.clone(),
1604 "SELECT * FROM test WHERE status.Contains('pend')",
1605 );
1606 match result {
1607 Ok(view) => {
1608 println!("SUCCESS: Found {} matching rows", view.row_count());
1609 assert_eq!(view.row_count(), 2); }
1611 Err(e) => {
1612 panic!("Query failed: {e}");
1613 }
1614 }
1615
1616 println!("\n--- Test 2: price.Contains('9') ---");
1618 let result = engine.execute(
1619 table.clone(),
1620 "SELECT * FROM test WHERE price.Contains('9')",
1621 );
1622 match result {
1623 Ok(view) => {
1624 println!(
1625 "SUCCESS: Found {} matching rows with price containing '9'",
1626 view.row_count()
1627 );
1628 assert!(view.row_count() >= 1);
1630 }
1631 Err(e) => {
1632 panic!("Numeric coercion query failed: {e}");
1633 }
1634 }
1635
1636 println!("\n=== All tests passed! ===");
1637 }
1638
1639 #[test]
1640 fn test_not_in_clause() {
1641 let _ = tracing_subscriber::fmt()
1643 .with_max_level(tracing::Level::DEBUG)
1644 .try_init();
1645
1646 let mut table = DataTable::new("test");
1647 table.add_column(DataColumn::new("id"));
1648 table.add_column(DataColumn::new("country"));
1649
1650 table
1652 .add_row(DataRow::new(vec![
1653 DataValue::Integer(1),
1654 DataValue::String("CA".to_string()),
1655 ]))
1656 .unwrap();
1657
1658 table
1659 .add_row(DataRow::new(vec![
1660 DataValue::Integer(2),
1661 DataValue::String("US".to_string()),
1662 ]))
1663 .unwrap();
1664
1665 table
1666 .add_row(DataRow::new(vec![
1667 DataValue::Integer(3),
1668 DataValue::String("UK".to_string()),
1669 ]))
1670 .unwrap();
1671
1672 let table = Arc::new(table);
1673 let engine = QueryEngine::new();
1674
1675 println!("\n=== Testing NOT IN clause ===");
1676 println!("Table has {} rows", table.row_count());
1677 for i in 0..table.row_count() {
1678 let country = table.get_value(i, 1);
1679 println!("Row {i}: country = {country:?}");
1680 }
1681
1682 println!("\n--- Test: country NOT IN ('CA') ---");
1684 let result = engine.execute(
1685 table.clone(),
1686 "SELECT * FROM test WHERE country NOT IN ('CA')",
1687 );
1688 match result {
1689 Ok(view) => {
1690 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
1691 assert_eq!(view.row_count(), 2); }
1693 Err(e) => {
1694 panic!("NOT IN query failed: {e}");
1695 }
1696 }
1697
1698 println!("\n=== NOT IN test complete! ===");
1699 }
1700
1701 #[test]
1702 fn test_case_insensitive_in_and_not_in() {
1703 let _ = tracing_subscriber::fmt()
1705 .with_max_level(tracing::Level::DEBUG)
1706 .try_init();
1707
1708 let mut table = DataTable::new("test");
1709 table.add_column(DataColumn::new("id"));
1710 table.add_column(DataColumn::new("country"));
1711
1712 table
1714 .add_row(DataRow::new(vec![
1715 DataValue::Integer(1),
1716 DataValue::String("CA".to_string()), ]))
1718 .unwrap();
1719
1720 table
1721 .add_row(DataRow::new(vec![
1722 DataValue::Integer(2),
1723 DataValue::String("us".to_string()), ]))
1725 .unwrap();
1726
1727 table
1728 .add_row(DataRow::new(vec![
1729 DataValue::Integer(3),
1730 DataValue::String("UK".to_string()), ]))
1732 .unwrap();
1733
1734 let table = Arc::new(table);
1735
1736 println!("\n=== Testing Case-Insensitive IN clause ===");
1737 println!("Table has {} rows", table.row_count());
1738 for i in 0..table.row_count() {
1739 let country = table.get_value(i, 1);
1740 println!("Row {i}: country = {country:?}");
1741 }
1742
1743 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
1745 let engine = QueryEngine::with_case_insensitive(true);
1746 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1747 match result {
1748 Ok(view) => {
1749 println!(
1750 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
1751 view.row_count()
1752 );
1753 assert_eq!(view.row_count(), 1); }
1755 Err(e) => {
1756 panic!("Case-insensitive IN query failed: {e}");
1757 }
1758 }
1759
1760 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
1762 let result = engine.execute(
1763 table.clone(),
1764 "SELECT * FROM test WHERE country NOT IN ('ca')",
1765 );
1766 match result {
1767 Ok(view) => {
1768 println!(
1769 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
1770 view.row_count()
1771 );
1772 assert_eq!(view.row_count(), 2); }
1774 Err(e) => {
1775 panic!("Case-insensitive NOT IN query failed: {e}");
1776 }
1777 }
1778
1779 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
1781 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
1783 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
1784 match result {
1785 Ok(view) => {
1786 println!(
1787 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
1788 view.row_count()
1789 );
1790 assert_eq!(view.row_count(), 0); }
1792 Err(e) => {
1793 panic!("Case-sensitive IN query failed: {e}");
1794 }
1795 }
1796
1797 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
1798 }
1799
1800 #[test]
1801 #[ignore = "Parentheses in WHERE clause not yet implemented"]
1802 fn test_parentheses_in_where_clause() {
1803 let _ = tracing_subscriber::fmt()
1805 .with_max_level(tracing::Level::DEBUG)
1806 .try_init();
1807
1808 let mut table = DataTable::new("test");
1809 table.add_column(DataColumn::new("id"));
1810 table.add_column(DataColumn::new("status"));
1811 table.add_column(DataColumn::new("priority"));
1812
1813 table
1815 .add_row(DataRow::new(vec![
1816 DataValue::Integer(1),
1817 DataValue::String("Pending".to_string()),
1818 DataValue::String("High".to_string()),
1819 ]))
1820 .unwrap();
1821
1822 table
1823 .add_row(DataRow::new(vec![
1824 DataValue::Integer(2),
1825 DataValue::String("Complete".to_string()),
1826 DataValue::String("High".to_string()),
1827 ]))
1828 .unwrap();
1829
1830 table
1831 .add_row(DataRow::new(vec![
1832 DataValue::Integer(3),
1833 DataValue::String("Pending".to_string()),
1834 DataValue::String("Low".to_string()),
1835 ]))
1836 .unwrap();
1837
1838 table
1839 .add_row(DataRow::new(vec![
1840 DataValue::Integer(4),
1841 DataValue::String("Complete".to_string()),
1842 DataValue::String("Low".to_string()),
1843 ]))
1844 .unwrap();
1845
1846 let table = Arc::new(table);
1847 let engine = QueryEngine::new();
1848
1849 println!("\n=== Testing Parentheses in WHERE clause ===");
1850 println!("Table has {} rows", table.row_count());
1851 for i in 0..table.row_count() {
1852 let status = table.get_value(i, 1);
1853 let priority = table.get_value(i, 2);
1854 println!("Row {i}: status = {status:?}, priority = {priority:?}");
1855 }
1856
1857 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
1859 let result = engine.execute(
1860 table.clone(),
1861 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
1862 );
1863 match result {
1864 Ok(view) => {
1865 println!(
1866 "SUCCESS: Found {} rows with parenthetical logic",
1867 view.row_count()
1868 );
1869 assert_eq!(view.row_count(), 2); }
1871 Err(e) => {
1872 panic!("Parentheses query failed: {e}");
1873 }
1874 }
1875
1876 println!("\n=== Parentheses test complete! ===");
1877 }
1878
1879 #[test]
1880 #[ignore = "Numeric type coercion needs fixing"]
1881 fn test_numeric_type_coercion() {
1882 let _ = tracing_subscriber::fmt()
1884 .with_max_level(tracing::Level::DEBUG)
1885 .try_init();
1886
1887 let mut table = DataTable::new("test");
1888 table.add_column(DataColumn::new("id"));
1889 table.add_column(DataColumn::new("price"));
1890 table.add_column(DataColumn::new("quantity"));
1891
1892 table
1894 .add_row(DataRow::new(vec![
1895 DataValue::Integer(1),
1896 DataValue::Float(99.50), DataValue::Integer(100),
1898 ]))
1899 .unwrap();
1900
1901 table
1902 .add_row(DataRow::new(vec![
1903 DataValue::Integer(2),
1904 DataValue::Float(150.0), DataValue::Integer(200),
1906 ]))
1907 .unwrap();
1908
1909 table
1910 .add_row(DataRow::new(vec![
1911 DataValue::Integer(3),
1912 DataValue::Integer(75), DataValue::Integer(50),
1914 ]))
1915 .unwrap();
1916
1917 let table = Arc::new(table);
1918 let engine = QueryEngine::new();
1919
1920 println!("\n=== Testing Numeric Type Coercion ===");
1921 println!("Table has {} rows", table.row_count());
1922 for i in 0..table.row_count() {
1923 let price = table.get_value(i, 1);
1924 let quantity = table.get_value(i, 2);
1925 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
1926 }
1927
1928 println!("\n--- Test: price.Contains('.') ---");
1930 let result = engine.execute(
1931 table.clone(),
1932 "SELECT * FROM test WHERE price.Contains('.')",
1933 );
1934 match result {
1935 Ok(view) => {
1936 println!(
1937 "SUCCESS: Found {} rows with decimal points in price",
1938 view.row_count()
1939 );
1940 assert_eq!(view.row_count(), 2); }
1942 Err(e) => {
1943 panic!("Numeric Contains query failed: {e}");
1944 }
1945 }
1946
1947 println!("\n--- Test: quantity.Contains('0') ---");
1949 let result = engine.execute(
1950 table.clone(),
1951 "SELECT * FROM test WHERE quantity.Contains('0')",
1952 );
1953 match result {
1954 Ok(view) => {
1955 println!(
1956 "SUCCESS: Found {} rows with '0' in quantity",
1957 view.row_count()
1958 );
1959 assert_eq!(view.row_count(), 2); }
1961 Err(e) => {
1962 panic!("Integer Contains query failed: {e}");
1963 }
1964 }
1965
1966 println!("\n=== Numeric type coercion test complete! ===");
1967 }
1968
1969 #[test]
1970 fn test_datetime_comparisons() {
1971 let _ = tracing_subscriber::fmt()
1973 .with_max_level(tracing::Level::DEBUG)
1974 .try_init();
1975
1976 let mut table = DataTable::new("test");
1977 table.add_column(DataColumn::new("id"));
1978 table.add_column(DataColumn::new("created_date"));
1979
1980 table
1982 .add_row(DataRow::new(vec![
1983 DataValue::Integer(1),
1984 DataValue::String("2024-12-15".to_string()),
1985 ]))
1986 .unwrap();
1987
1988 table
1989 .add_row(DataRow::new(vec![
1990 DataValue::Integer(2),
1991 DataValue::String("2025-01-15".to_string()),
1992 ]))
1993 .unwrap();
1994
1995 table
1996 .add_row(DataRow::new(vec![
1997 DataValue::Integer(3),
1998 DataValue::String("2025-02-15".to_string()),
1999 ]))
2000 .unwrap();
2001
2002 let table = Arc::new(table);
2003 let engine = QueryEngine::new();
2004
2005 println!("\n=== Testing DateTime Comparisons ===");
2006 println!("Table has {} rows", table.row_count());
2007 for i in 0..table.row_count() {
2008 let date = table.get_value(i, 1);
2009 println!("Row {i}: created_date = {date:?}");
2010 }
2011
2012 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2014 let result = engine.execute(
2015 table.clone(),
2016 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2017 );
2018 match result {
2019 Ok(view) => {
2020 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2021 assert_eq!(view.row_count(), 2); }
2023 Err(e) => {
2024 panic!("DateTime comparison query failed: {e}");
2025 }
2026 }
2027
2028 println!("\n=== DateTime comparison test complete! ===");
2029 }
2030
2031 #[test]
2032 fn test_not_with_method_calls() {
2033 let _ = tracing_subscriber::fmt()
2035 .with_max_level(tracing::Level::DEBUG)
2036 .try_init();
2037
2038 let mut table = DataTable::new("test");
2039 table.add_column(DataColumn::new("id"));
2040 table.add_column(DataColumn::new("status"));
2041
2042 table
2044 .add_row(DataRow::new(vec![
2045 DataValue::Integer(1),
2046 DataValue::String("Pending Review".to_string()),
2047 ]))
2048 .unwrap();
2049
2050 table
2051 .add_row(DataRow::new(vec![
2052 DataValue::Integer(2),
2053 DataValue::String("Complete".to_string()),
2054 ]))
2055 .unwrap();
2056
2057 table
2058 .add_row(DataRow::new(vec![
2059 DataValue::Integer(3),
2060 DataValue::String("Pending Approval".to_string()),
2061 ]))
2062 .unwrap();
2063
2064 let table = Arc::new(table);
2065 let engine = QueryEngine::with_case_insensitive(true);
2066
2067 println!("\n=== Testing NOT with Method Calls ===");
2068 println!("Table has {} rows", table.row_count());
2069 for i in 0..table.row_count() {
2070 let status = table.get_value(i, 1);
2071 println!("Row {i}: status = {status:?}");
2072 }
2073
2074 println!("\n--- Test: NOT status.Contains('pend') ---");
2076 let result = engine.execute(
2077 table.clone(),
2078 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2079 );
2080 match result {
2081 Ok(view) => {
2082 println!(
2083 "SUCCESS: Found {} rows NOT containing 'pend'",
2084 view.row_count()
2085 );
2086 assert_eq!(view.row_count(), 1); }
2088 Err(e) => {
2089 panic!("NOT Contains query failed: {e}");
2090 }
2091 }
2092
2093 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2095 let result = engine.execute(
2096 table.clone(),
2097 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2098 );
2099 match result {
2100 Ok(view) => {
2101 println!(
2102 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2103 view.row_count()
2104 );
2105 assert_eq!(view.row_count(), 1); }
2107 Err(e) => {
2108 panic!("NOT StartsWith query failed: {e}");
2109 }
2110 }
2111
2112 println!("\n=== NOT with method calls test complete! ===");
2113 }
2114
2115 #[test]
2116 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2117 fn test_complex_logical_expressions() {
2118 let _ = tracing_subscriber::fmt()
2120 .with_max_level(tracing::Level::DEBUG)
2121 .try_init();
2122
2123 let mut table = DataTable::new("test");
2124 table.add_column(DataColumn::new("id"));
2125 table.add_column(DataColumn::new("status"));
2126 table.add_column(DataColumn::new("priority"));
2127 table.add_column(DataColumn::new("assigned"));
2128
2129 table
2131 .add_row(DataRow::new(vec![
2132 DataValue::Integer(1),
2133 DataValue::String("Pending".to_string()),
2134 DataValue::String("High".to_string()),
2135 DataValue::String("John".to_string()),
2136 ]))
2137 .unwrap();
2138
2139 table
2140 .add_row(DataRow::new(vec![
2141 DataValue::Integer(2),
2142 DataValue::String("Complete".to_string()),
2143 DataValue::String("High".to_string()),
2144 DataValue::String("Jane".to_string()),
2145 ]))
2146 .unwrap();
2147
2148 table
2149 .add_row(DataRow::new(vec![
2150 DataValue::Integer(3),
2151 DataValue::String("Pending".to_string()),
2152 DataValue::String("Low".to_string()),
2153 DataValue::String("John".to_string()),
2154 ]))
2155 .unwrap();
2156
2157 table
2158 .add_row(DataRow::new(vec![
2159 DataValue::Integer(4),
2160 DataValue::String("In Progress".to_string()),
2161 DataValue::String("Medium".to_string()),
2162 DataValue::String("Jane".to_string()),
2163 ]))
2164 .unwrap();
2165
2166 let table = Arc::new(table);
2167 let engine = QueryEngine::new();
2168
2169 println!("\n=== Testing Complex Logical Expressions ===");
2170 println!("Table has {} rows", table.row_count());
2171 for i in 0..table.row_count() {
2172 let status = table.get_value(i, 1);
2173 let priority = table.get_value(i, 2);
2174 let assigned = table.get_value(i, 3);
2175 println!(
2176 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2177 );
2178 }
2179
2180 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2182 let result = engine.execute(
2183 table.clone(),
2184 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2185 );
2186 match result {
2187 Ok(view) => {
2188 println!(
2189 "SUCCESS: Found {} rows with complex logic",
2190 view.row_count()
2191 );
2192 assert_eq!(view.row_count(), 2); }
2194 Err(e) => {
2195 panic!("Complex logic query failed: {e}");
2196 }
2197 }
2198
2199 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2201 let result = engine.execute(
2202 table.clone(),
2203 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2204 );
2205 match result {
2206 Ok(view) => {
2207 println!(
2208 "SUCCESS: Found {} rows with NOT complex logic",
2209 view.row_count()
2210 );
2211 assert_eq!(view.row_count(), 2); }
2213 Err(e) => {
2214 panic!("NOT complex logic query failed: {e}");
2215 }
2216 }
2217
2218 println!("\n=== Complex logical expressions test complete! ===");
2219 }
2220
2221 #[test]
2222 fn test_mixed_data_types_and_edge_cases() {
2223 let _ = tracing_subscriber::fmt()
2225 .with_max_level(tracing::Level::DEBUG)
2226 .try_init();
2227
2228 let mut table = DataTable::new("test");
2229 table.add_column(DataColumn::new("id"));
2230 table.add_column(DataColumn::new("value"));
2231 table.add_column(DataColumn::new("nullable_field"));
2232
2233 table
2235 .add_row(DataRow::new(vec![
2236 DataValue::Integer(1),
2237 DataValue::String("123.45".to_string()),
2238 DataValue::String("present".to_string()),
2239 ]))
2240 .unwrap();
2241
2242 table
2243 .add_row(DataRow::new(vec![
2244 DataValue::Integer(2),
2245 DataValue::Float(678.90),
2246 DataValue::Null,
2247 ]))
2248 .unwrap();
2249
2250 table
2251 .add_row(DataRow::new(vec![
2252 DataValue::Integer(3),
2253 DataValue::Boolean(true),
2254 DataValue::String("also present".to_string()),
2255 ]))
2256 .unwrap();
2257
2258 table
2259 .add_row(DataRow::new(vec![
2260 DataValue::Integer(4),
2261 DataValue::String("false".to_string()),
2262 DataValue::Null,
2263 ]))
2264 .unwrap();
2265
2266 let table = Arc::new(table);
2267 let engine = QueryEngine::new();
2268
2269 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2270 println!("Table has {} rows", table.row_count());
2271 for i in 0..table.row_count() {
2272 let value = table.get_value(i, 1);
2273 let nullable = table.get_value(i, 2);
2274 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2275 }
2276
2277 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2279 let result = engine.execute(
2280 table.clone(),
2281 "SELECT * FROM test WHERE value.Contains('true')",
2282 );
2283 match result {
2284 Ok(view) => {
2285 println!(
2286 "SUCCESS: Found {} rows with boolean coercion",
2287 view.row_count()
2288 );
2289 assert_eq!(view.row_count(), 1); }
2291 Err(e) => {
2292 panic!("Boolean coercion query failed: {e}");
2293 }
2294 }
2295
2296 println!("\n--- Test: id IN (1, 3) ---");
2298 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2299 match result {
2300 Ok(view) => {
2301 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2302 assert_eq!(view.row_count(), 2); }
2304 Err(e) => {
2305 panic!("Multiple IN values query failed: {e}");
2306 }
2307 }
2308
2309 println!("\n=== Mixed data types test complete! ===");
2310 }
2311
2312 #[test]
2314 fn test_aggregate_only_single_row() {
2315 let table = create_test_stock_data();
2316 let engine = QueryEngine::new();
2317
2318 let result = engine
2320 .execute(
2321 table.clone(),
2322 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2323 )
2324 .expect("Query should succeed");
2325
2326 assert_eq!(
2327 result.row_count(),
2328 1,
2329 "Aggregate-only query should return exactly 1 row"
2330 );
2331 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2332
2333 let source = result.source();
2335 let row = source.get_row(0).expect("Should have first row");
2336
2337 assert_eq!(row.values[0], DataValue::Integer(5));
2339
2340 assert_eq!(row.values[1], DataValue::Float(99.5));
2342
2343 assert_eq!(row.values[2], DataValue::Float(105.0));
2345
2346 if let DataValue::Float(avg) = &row.values[3] {
2348 assert!(
2349 (avg - 102.4).abs() < 0.01,
2350 "Average should be approximately 102.4, got {}",
2351 avg
2352 );
2353 } else {
2354 panic!("AVG should return a Float value");
2355 }
2356 }
2357
2358 #[test]
2360 fn test_single_aggregate_single_row() {
2361 let table = create_test_stock_data();
2362 let engine = QueryEngine::new();
2363
2364 let result = engine
2365 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2366 .expect("Query should succeed");
2367
2368 assert_eq!(
2369 result.row_count(),
2370 1,
2371 "Single aggregate query should return exactly 1 row"
2372 );
2373 assert_eq!(result.column_count(), 1, "Should have 1 column");
2374
2375 let source = result.source();
2376 let row = source.get_row(0).expect("Should have first row");
2377 assert_eq!(row.values[0], DataValue::Integer(5));
2378 }
2379
2380 #[test]
2382 fn test_aggregate_with_where_single_row() {
2383 let table = create_test_stock_data();
2384 let engine = QueryEngine::new();
2385
2386 let result = engine
2388 .execute(
2389 table.clone(),
2390 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2391 )
2392 .expect("Query should succeed");
2393
2394 assert_eq!(
2395 result.row_count(),
2396 1,
2397 "Filtered aggregate query should return exactly 1 row"
2398 );
2399 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2400
2401 let source = result.source();
2402 let row = source.get_row(0).expect("Should have first row");
2403
2404 assert_eq!(row.values[0], DataValue::Integer(2));
2406 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
2409
2410 #[test]
2411 fn test_not_in_parsing() {
2412 use crate::sql::recursive_parser::Parser;
2413
2414 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
2415 println!("\n=== Testing NOT IN parsing ===");
2416 println!("Parsing query: {query}");
2417
2418 let mut parser = Parser::new(query);
2419 match parser.parse() {
2420 Ok(statement) => {
2421 println!("Parsed statement: {statement:#?}");
2422 if let Some(where_clause) = statement.where_clause {
2423 println!("WHERE conditions: {:#?}", where_clause.conditions);
2424 if let Some(first_condition) = where_clause.conditions.first() {
2425 println!("First condition expression: {:#?}", first_condition.expr);
2426 }
2427 }
2428 }
2429 Err(e) => {
2430 panic!("Parse error: {e}");
2431 }
2432 }
2433 }
2434
2435 fn create_test_stock_data() -> Arc<DataTable> {
2437 let mut table = DataTable::new("stock");
2438
2439 table.add_column(DataColumn::new("symbol"));
2440 table.add_column(DataColumn::new("close"));
2441 table.add_column(DataColumn::new("volume"));
2442
2443 let test_data = vec![
2445 ("AAPL", 99.5, 1000),
2446 ("AAPL", 101.2, 1500),
2447 ("AAPL", 103.5, 2000),
2448 ("AAPL", 105.0, 1200),
2449 ("AAPL", 102.8, 1800),
2450 ];
2451
2452 for (symbol, close, volume) in test_data {
2453 table
2454 .add_row(DataRow::new(vec![
2455 DataValue::String(symbol.to_string()),
2456 DataValue::Float(close),
2457 DataValue::Integer(volume),
2458 ]))
2459 .expect("Should add row successfully");
2460 }
2461
2462 Arc::new(table)
2463 }
2464}
2465
2466#[cfg(test)]
2467#[path = "query_engine_tests.rs"]
2468mod query_engine_tests;