1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::recursive_parser::{
27 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
28 TableFunction,
29};
30
31#[derive(Debug, Clone)]
33pub struct ExecutionContext {
34 alias_map: HashMap<String, String>,
37}
38
39impl ExecutionContext {
40 pub fn new() -> Self {
42 Self {
43 alias_map: HashMap::new(),
44 }
45 }
46
47 pub fn register_alias(&mut self, alias: String, table_name: String) {
49 debug!("Registering alias: {} -> {}", alias, table_name);
50 self.alias_map.insert(alias, table_name);
51 }
52
53 pub fn resolve_alias(&self, name: &str) -> String {
56 self.alias_map
57 .get(name)
58 .cloned()
59 .unwrap_or_else(|| name.to_string())
60 }
61
62 pub fn is_alias(&self, name: &str) -> bool {
64 self.alias_map.contains_key(name)
65 }
66
67 pub fn get_aliases(&self) -> HashMap<String, String> {
69 self.alias_map.clone()
70 }
71
72 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
87 if let Some(table_prefix) = &column_ref.table_prefix {
88 let actual_table = self.resolve_alias(table_prefix);
90
91 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
93 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
94 debug!(
95 "Resolved {}.{} -> qualified column '{}' at index {}",
96 table_prefix, column_ref.name, qualified_name, idx
97 );
98 return Ok(idx);
99 }
100
101 if let Some(idx) = table.get_column_index(&column_ref.name) {
103 debug!(
104 "Resolved {}.{} -> unqualified column '{}' at index {}",
105 table_prefix, column_ref.name, column_ref.name, idx
106 );
107 return Ok(idx);
108 }
109
110 Err(anyhow!(
112 "Column '{}' not found. Table '{}' may not support qualified column names",
113 qualified_name,
114 actual_table
115 ))
116 } else {
117 if let Some(idx) = table.get_column_index(&column_ref.name) {
119 debug!(
120 "Resolved unqualified column '{}' at index {}",
121 column_ref.name, idx
122 );
123 return Ok(idx);
124 }
125
126 if column_ref.name.contains('.') {
128 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
129 debug!(
130 "Resolved '{}' as qualified column at index {}",
131 column_ref.name, idx
132 );
133 return Ok(idx);
134 }
135 }
136
137 let suggestion = self.find_similar_column(table, &column_ref.name);
139 match suggestion {
140 Some(similar) => Err(anyhow!(
141 "Column '{}' not found. Did you mean '{}'?",
142 column_ref.name,
143 similar
144 )),
145 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
146 }
147 }
148 }
149
150 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
152 let columns = table.column_names();
153 let mut best_match: Option<(String, usize)> = None;
154
155 for col in columns {
156 let distance = edit_distance(name, &col);
157 if distance <= 2 {
158 match best_match {
160 Some((_, best_dist)) if distance < best_dist => {
161 best_match = Some((col.clone(), distance));
162 }
163 None => {
164 best_match = Some((col.clone(), distance));
165 }
166 _ => {}
167 }
168 }
169 }
170
171 best_match.map(|(name, _)| name)
172 }
173}
174
175impl Default for ExecutionContext {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181fn edit_distance(a: &str, b: &str) -> usize {
183 let len_a = a.chars().count();
184 let len_b = b.chars().count();
185
186 if len_a == 0 {
187 return len_b;
188 }
189 if len_b == 0 {
190 return len_a;
191 }
192
193 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
194
195 for i in 0..=len_a {
196 matrix[i][0] = i;
197 }
198 for j in 0..=len_b {
199 matrix[0][j] = j;
200 }
201
202 let a_chars: Vec<char> = a.chars().collect();
203 let b_chars: Vec<char> = b.chars().collect();
204
205 for i in 1..=len_a {
206 for j in 1..=len_b {
207 let cost = if a_chars[i - 1] == b_chars[j - 1] {
208 0
209 } else {
210 1
211 };
212 matrix[i][j] = min(
213 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
214 matrix[i - 1][j - 1] + cost,
215 );
216 }
217 }
218
219 matrix[len_a][len_b]
220}
221
222#[derive(Clone)]
224pub struct QueryEngine {
225 case_insensitive: bool,
226 date_notation: String,
227 _behavior_config: Option<BehaviorConfig>,
228}
229
230impl Default for QueryEngine {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl QueryEngine {
237 #[must_use]
238 pub fn new() -> Self {
239 Self {
240 case_insensitive: false,
241 date_notation: get_date_notation(),
242 _behavior_config: None,
243 }
244 }
245
246 #[must_use]
247 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
248 let case_insensitive = config.case_insensitive_default;
249 let date_notation = get_date_notation();
251 Self {
252 case_insensitive,
253 date_notation,
254 _behavior_config: Some(config),
255 }
256 }
257
258 #[must_use]
259 pub fn with_date_notation(_date_notation: String) -> Self {
260 Self {
261 case_insensitive: false,
262 date_notation: get_date_notation(), _behavior_config: None,
264 }
265 }
266
267 #[must_use]
268 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
269 Self {
270 case_insensitive,
271 date_notation: get_date_notation(),
272 _behavior_config: None,
273 }
274 }
275
276 #[must_use]
277 pub fn with_case_insensitive_and_date_notation(
278 case_insensitive: bool,
279 _date_notation: String, ) -> Self {
281 Self {
282 case_insensitive,
283 date_notation: get_date_notation(), _behavior_config: None,
285 }
286 }
287
288 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
290 let columns = table.column_names();
291 let mut best_match: Option<(String, usize)> = None;
292
293 for col in columns {
294 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
295 let max_distance = if name.len() > 10 { 3 } else { 2 };
298 if distance <= max_distance {
299 match &best_match {
300 None => best_match = Some((col, distance)),
301 Some((_, best_dist)) if distance < *best_dist => {
302 best_match = Some((col, distance));
303 }
304 _ => {}
305 }
306 }
307 }
308
309 best_match.map(|(name, _)| name)
310 }
311
312 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
314 let len1 = s1.len();
315 let len2 = s2.len();
316 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
317
318 for i in 0..=len1 {
319 matrix[i][0] = i;
320 }
321 for j in 0..=len2 {
322 matrix[0][j] = j;
323 }
324
325 for (i, c1) in s1.chars().enumerate() {
326 for (j, c2) in s2.chars().enumerate() {
327 let cost = usize::from(c1 != c2);
328 matrix[i + 1][j + 1] = std::cmp::min(
329 matrix[i][j + 1] + 1, std::cmp::min(
331 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
334 );
335 }
336 }
337
338 matrix[len1][len2]
339 }
340
341 fn contains_unnest(expr: &SqlExpression) -> bool {
343 match expr {
344 SqlExpression::Unnest { .. } => true,
346 SqlExpression::FunctionCall { name, args, .. } => {
347 if name.to_uppercase() == "UNNEST" {
348 return true;
349 }
350 args.iter().any(Self::contains_unnest)
352 }
353 SqlExpression::BinaryOp { left, right, .. } => {
354 Self::contains_unnest(left) || Self::contains_unnest(right)
355 }
356 SqlExpression::Not { expr } => Self::contains_unnest(expr),
357 SqlExpression::CaseExpression {
358 when_branches,
359 else_branch,
360 } => {
361 when_branches.iter().any(|branch| {
362 Self::contains_unnest(&branch.condition)
363 || Self::contains_unnest(&branch.result)
364 }) || else_branch
365 .as_ref()
366 .map_or(false, |e| Self::contains_unnest(e))
367 }
368 SqlExpression::SimpleCaseExpression {
369 expr,
370 when_branches,
371 else_branch,
372 } => {
373 Self::contains_unnest(expr)
374 || when_branches.iter().any(|branch| {
375 Self::contains_unnest(&branch.value)
376 || Self::contains_unnest(&branch.result)
377 })
378 || else_branch
379 .as_ref()
380 .map_or(false, |e| Self::contains_unnest(e))
381 }
382 SqlExpression::InList { expr, values } => {
383 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
384 }
385 SqlExpression::NotInList { expr, values } => {
386 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
387 }
388 SqlExpression::Between { expr, lower, upper } => {
389 Self::contains_unnest(expr)
390 || Self::contains_unnest(lower)
391 || Self::contains_unnest(upper)
392 }
393 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
394 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
397 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::ChainedMethodCall { base, args, .. } => {
399 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
400 }
401 _ => false,
402 }
403 }
404
405 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407 let (view, _plan) = self.execute_with_plan(table, sql)?;
408 Ok(view)
409 }
410
411 pub fn execute_with_temp_tables(
413 &self,
414 table: Arc<DataTable>,
415 sql: &str,
416 temp_tables: Option<&TempTableRegistry>,
417 ) -> Result<DataView> {
418 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419 Ok(view)
420 }
421
422 pub fn execute_statement(
424 &self,
425 table: Arc<DataTable>,
426 statement: SelectStatement,
427 ) -> Result<DataView> {
428 self.execute_statement_with_temp_tables(table, statement, None)
429 }
430
431 pub fn execute_statement_with_temp_tables(
433 &self,
434 table: Arc<DataTable>,
435 statement: SelectStatement,
436 temp_tables: Option<&TempTableRegistry>,
437 ) -> Result<DataView> {
438 let mut cte_context = HashMap::new();
440
441 if let Some(temp_registry) = temp_tables {
443 for table_name in temp_registry.list_tables() {
444 if let Some(temp_table) = temp_registry.get(&table_name) {
445 debug!("Adding temp table {} to CTE context", table_name);
446 let view = DataView::new(temp_table);
447 cte_context.insert(table_name, Arc::new(view));
448 }
449 }
450 }
451
452 for cte in &statement.ctes {
453 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454 let cte_result = match &cte.cte_type {
456 CTEType::Standard(query) => {
457 let view = self.build_view_with_context(
459 table.clone(),
460 query.clone(),
461 &mut cte_context,
462 )?;
463
464 let mut materialized = self.materialize_view(view)?;
466
467 for column in materialized.columns_mut() {
469 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470 column.source_table = Some(cte.name.clone());
471 }
472
473 DataView::new(Arc::new(materialized))
474 }
475 CTEType::Web(web_spec) => {
476 use crate::web::http_fetcher::WebDataFetcher;
478
479 let fetcher = WebDataFetcher::new()?;
480 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483 for column in data_table.columns_mut() {
485 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486 column.source_table = Some(cte.name.clone());
487 }
488
489 DataView::new(Arc::new(data_table))
491 }
492 };
493 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495 debug!(
496 "QueryEngine: CTE '{}' pre-processed, stored in context",
497 cte.name
498 );
499 }
500
501 let mut subquery_executor =
503 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506 self.build_view_with_context(table, processed_statement, &mut cte_context)
508 }
509
510 pub fn execute_statement_with_cte_context(
512 &self,
513 table: Arc<DataTable>,
514 statement: SelectStatement,
515 cte_context: &HashMap<String, Arc<DataView>>,
516 ) -> Result<DataView> {
517 let mut local_context = cte_context.clone();
519
520 for cte in &statement.ctes {
522 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523 let cte_result = match &cte.cte_type {
524 CTEType::Standard(query) => {
525 let view = self.build_view_with_context(
526 table.clone(),
527 query.clone(),
528 &mut local_context,
529 )?;
530
531 let mut materialized = self.materialize_view(view)?;
533
534 for column in materialized.columns_mut() {
536 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537 column.source_table = Some(cte.name.clone());
538 }
539
540 DataView::new(Arc::new(materialized))
541 }
542 CTEType::Web(web_spec) => {
543 use crate::web::http_fetcher::WebDataFetcher;
545
546 let fetcher = WebDataFetcher::new()?;
547 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550 for column in data_table.columns_mut() {
552 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553 column.source_table = Some(cte.name.clone());
554 }
555
556 DataView::new(Arc::new(data_table))
558 }
559 };
560 local_context.insert(cte.name.clone(), Arc::new(cte_result));
561 }
562
563 let mut subquery_executor =
565 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568 self.build_view_with_context(table, processed_statement, &mut local_context)
570 }
571
572 pub fn execute_with_plan(
574 &self,
575 table: Arc<DataTable>,
576 sql: &str,
577 ) -> Result<(DataView, ExecutionPlan)> {
578 self.execute_with_plan_and_temp_tables(table, sql, None)
579 }
580
581 pub fn execute_with_plan_and_temp_tables(
583 &self,
584 table: Arc<DataTable>,
585 sql: &str,
586 temp_tables: Option<&TempTableRegistry>,
587 ) -> Result<(DataView, ExecutionPlan)> {
588 let mut plan_builder = ExecutionPlanBuilder::new();
589 let start_time = Instant::now();
590
591 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593 plan_builder.add_detail(format!("Query: {}", sql));
594 let mut parser = Parser::new(sql);
595 let statement = parser
596 .parse()
597 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598 plan_builder.add_detail(format!("Parsed successfully"));
599 if let Some(from) = &statement.from_table {
600 plan_builder.add_detail(format!("FROM: {}", from));
601 }
602 if statement.where_clause.is_some() {
603 plan_builder.add_detail("WHERE clause present".to_string());
604 }
605 plan_builder.end_step();
606
607 let mut cte_context = HashMap::new();
609
610 if let Some(temp_registry) = temp_tables {
612 for table_name in temp_registry.list_tables() {
613 if let Some(temp_table) = temp_registry.get(&table_name) {
614 debug!("Adding temp table {} to CTE context", table_name);
615 let view = DataView::new(temp_table);
616 cte_context.insert(table_name, Arc::new(view));
617 }
618 }
619 }
620
621 if !statement.ctes.is_empty() {
622 plan_builder.begin_step(
623 StepType::CTE,
624 format!("Process {} CTEs", statement.ctes.len()),
625 );
626
627 for cte in &statement.ctes {
628 let cte_start = Instant::now();
629 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631 let cte_result = match &cte.cte_type {
632 CTEType::Standard(query) => {
633 if let Some(from) = &query.from_table {
635 plan_builder.add_detail(format!("Source: {}", from));
636 }
637 if query.where_clause.is_some() {
638 plan_builder.add_detail("Has WHERE clause".to_string());
639 }
640 if query.group_by.is_some() {
641 plan_builder.add_detail("Has GROUP BY".to_string());
642 }
643
644 debug!(
645 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646 cte.name,
647 cte_context.keys().collect::<Vec<_>>()
648 );
649
650 let mut subquery_executor = SubqueryExecutor::with_cte_context(
653 self.clone(),
654 table.clone(),
655 cte_context.clone(),
656 );
657 let processed_query = subquery_executor.execute_subqueries(query)?;
658
659 let view = self.build_view_with_context(
660 table.clone(),
661 processed_query,
662 &mut cte_context,
663 )?;
664
665 let mut materialized = self.materialize_view(view)?;
667
668 for column in materialized.columns_mut() {
670 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671 column.source_table = Some(cte.name.clone());
672 }
673
674 DataView::new(Arc::new(materialized))
675 }
676 CTEType::Web(web_spec) => {
677 plan_builder.add_detail(format!("URL: {}", web_spec.url));
678 if let Some(format) = &web_spec.format {
679 plan_builder.add_detail(format!("Format: {:?}", format));
680 }
681 if let Some(cache) = web_spec.cache_seconds {
682 plan_builder.add_detail(format!("Cache: {} seconds", cache));
683 }
684
685 use crate::web::http_fetcher::WebDataFetcher;
687
688 let fetcher = WebDataFetcher::new()?;
689 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692 for column in data_table.columns_mut() {
694 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695 column.source_table = Some(cte.name.clone());
696 }
697
698 DataView::new(Arc::new(data_table))
700 }
701 };
702
703 plan_builder.set_rows_out(cte_result.row_count());
705 plan_builder.add_detail(format!(
706 "Result: {} rows, {} columns",
707 cte_result.row_count(),
708 cte_result.column_count()
709 ));
710 plan_builder.add_detail(format!(
711 "Execution time: {:.3}ms",
712 cte_start.elapsed().as_secs_f64() * 1000.0
713 ));
714
715 debug!(
716 "QueryEngine: Storing CTE '{}' in context with {} rows",
717 cte.name,
718 cte_result.row_count()
719 );
720 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721 plan_builder.end_step();
722 }
723
724 plan_builder.add_detail(format!(
725 "All {} CTEs cached in context",
726 statement.ctes.len()
727 ));
728 plan_builder.end_step();
729 }
730
731 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733 let mut subquery_executor =
734 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738 format!("{:?}", w).contains("Subquery")
740 });
741
742 if has_subqueries {
743 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744 }
745
746 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748 if has_subqueries {
749 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750 } else {
751 plan_builder.add_detail("No subqueries to process".to_string());
752 }
753
754 plan_builder.end_step();
755 let result = self.build_view_with_context_and_plan(
756 table,
757 processed_statement,
758 &mut cte_context,
759 &mut plan_builder,
760 )?;
761
762 let total_duration = start_time.elapsed();
763 info!(
764 "Query execution complete: total={:?}, rows={}",
765 total_duration,
766 result.row_count()
767 );
768
769 let plan = plan_builder.build();
770 Ok((result, plan))
771 }
772
773 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775 let mut cte_context = HashMap::new();
776 self.build_view_with_context(table, statement, &mut cte_context)
777 }
778
779 fn build_view_with_context(
781 &self,
782 table: Arc<DataTable>,
783 statement: SelectStatement,
784 cte_context: &mut HashMap<String, Arc<DataView>>,
785 ) -> Result<DataView> {
786 let mut dummy_plan = ExecutionPlanBuilder::new();
787 let mut exec_context = ExecutionContext::new();
788 self.build_view_with_context_and_plan_and_exec(
789 table,
790 statement,
791 cte_context,
792 &mut dummy_plan,
793 &mut exec_context,
794 )
795 }
796
797 fn build_view_with_context_and_plan(
799 &self,
800 table: Arc<DataTable>,
801 statement: SelectStatement,
802 cte_context: &mut HashMap<String, Arc<DataView>>,
803 plan: &mut ExecutionPlanBuilder,
804 ) -> Result<DataView> {
805 let mut exec_context = ExecutionContext::new();
806 self.build_view_with_context_and_plan_and_exec(
807 table,
808 statement,
809 cte_context,
810 plan,
811 &mut exec_context,
812 )
813 }
814
815 fn build_view_with_context_and_plan_and_exec(
817 &self,
818 table: Arc<DataTable>,
819 statement: SelectStatement,
820 cte_context: &mut HashMap<String, Arc<DataView>>,
821 plan: &mut ExecutionPlanBuilder,
822 exec_context: &mut ExecutionContext,
823 ) -> Result<DataView> {
824 for cte in &statement.ctes {
826 if cte_context.contains_key(&cte.name) {
828 debug!(
829 "QueryEngine: CTE '{}' already in context, skipping",
830 cte.name
831 );
832 continue;
833 }
834
835 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836 debug!(
837 "QueryEngine: Available CTEs for '{}': {:?}",
838 cte.name,
839 cte_context.keys().collect::<Vec<_>>()
840 );
841
842 let cte_result = match &cte.cte_type {
844 CTEType::Standard(query) => {
845 let view =
846 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848 let mut materialized = self.materialize_view(view)?;
850
851 for column in materialized.columns_mut() {
853 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854 column.source_table = Some(cte.name.clone());
855 }
856
857 DataView::new(Arc::new(materialized))
858 }
859 CTEType::Web(_web_spec) => {
860 return Err(anyhow!(
862 "Web CTEs should be processed in execute_select method"
863 ));
864 }
865 };
866
867 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869 debug!(
870 "QueryEngine: CTE '{}' processed, stored in context",
871 cte.name
872 );
873 }
874
875 let source_table = if let Some(ref table_func) = statement.from_function {
877 debug!("QueryEngine: Processing table function...");
879 match table_func {
880 TableFunction::Generator { name, args } => {
881 use crate::sql::generators::GeneratorRegistry;
883
884 let registry = GeneratorRegistry::new();
886
887 if let Some(generator) = registry.get(name) {
888 let mut evaluator = ArithmeticEvaluator::with_date_notation(
890 &table,
891 self.date_notation.clone(),
892 );
893 let dummy_row = 0;
894
895 let mut evaluated_args = Vec::new();
896 for arg in args {
897 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898 }
899
900 generator.generate(evaluated_args)?
902 } else {
903 return Err(anyhow!("Unknown generator function: {}", name));
904 }
905 }
906 }
907 } else if let Some(ref subquery) = statement.from_subquery {
908 debug!("QueryEngine: Processing FROM subquery...");
910 let subquery_result =
911 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913 let materialized = self.materialize_view(subquery_result)?;
916 Arc::new(materialized)
917 } else if let Some(ref table_name) = statement.from_table {
918 if let Some(cte_view) = cte_context.get(table_name) {
920 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921 let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924 if let Some(ref alias) = statement.from_alias {
926 debug!(
927 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928 alias, table_name
929 );
930 for column in materialized.columns_mut() {
931 if let Some(ref qualified_name) = column.qualified_name {
933 if qualified_name.starts_with(&format!("{}.", table_name)) {
934 column.qualified_name =
935 Some(qualified_name.replace(
936 &format!("{}.", table_name),
937 &format!("{}.", alias),
938 ));
939 }
940 }
941 if column.source_table.as_ref() == Some(table_name) {
943 column.source_table = Some(alias.clone());
944 }
945 }
946 }
947
948 Arc::new(materialized)
949 } else {
950 table.clone()
952 }
953 } else {
954 table.clone()
956 };
957
958 if let Some(ref alias) = statement.from_alias {
960 if let Some(ref table_name) = statement.from_table {
961 exec_context.register_alias(alias.clone(), table_name.clone());
962 }
963 }
964
965 let final_table = if !statement.joins.is_empty() {
967 plan.begin_step(
968 StepType::Join,
969 format!("Process {} JOINs", statement.joins.len()),
970 );
971 plan.set_rows_in(source_table.row_count());
972
973 let join_executor = HashJoinExecutor::new(self.case_insensitive);
974 let mut current_table = source_table;
975
976 for (idx, join_clause) in statement.joins.iter().enumerate() {
977 let join_start = Instant::now();
978 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981 plan.add_detail(format!(
982 "Executing {:?} JOIN on {} condition(s)",
983 join_clause.join_type,
984 join_clause.condition.conditions.len()
985 ));
986
987 let right_table = match &join_clause.table {
989 TableSource::Table(name) => {
990 if let Some(cte_view) = cte_context.get(name) {
992 let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994 if let Some(ref alias) = join_clause.alias {
996 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997 for column in materialized.columns_mut() {
998 if let Some(ref qualified_name) = column.qualified_name {
1000 if qualified_name.starts_with(&format!("{}.", name)) {
1001 column.qualified_name = Some(qualified_name.replace(
1002 &format!("{}.", name),
1003 &format!("{}.", alias),
1004 ));
1005 }
1006 }
1007 if column.source_table.as_ref() == Some(name) {
1009 column.source_table = Some(alias.clone());
1010 }
1011 }
1012 }
1013
1014 Arc::new(materialized)
1015 } else {
1016 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019 }
1020 }
1021 TableSource::DerivedTable { query, alias: _ } => {
1022 let subquery_result = self.build_view_with_context(
1024 table.clone(),
1025 *query.clone(),
1026 cte_context,
1027 )?;
1028 let materialized = self.materialize_view(subquery_result)?;
1029 Arc::new(materialized)
1030 }
1031 };
1032
1033 let joined = join_executor.execute_join(
1035 current_table.clone(),
1036 join_clause,
1037 right_table.clone(),
1038 )?;
1039
1040 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041 plan.set_rows_out(joined.row_count());
1042 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043 plan.add_detail(format!(
1044 "Join time: {:.3}ms",
1045 join_start.elapsed().as_secs_f64() * 1000.0
1046 ));
1047 plan.end_step();
1048
1049 current_table = Arc::new(joined);
1050 }
1051
1052 plan.set_rows_out(current_table.row_count());
1053 plan.add_detail(format!(
1054 "Final result after all joins: {} rows",
1055 current_table.row_count()
1056 ));
1057 plan.end_step();
1058 current_table
1059 } else {
1060 source_table
1061 };
1062
1063 self.build_view_internal_with_plan_and_exec(
1065 final_table,
1066 statement,
1067 plan,
1068 Some(exec_context),
1069 )
1070 }
1071
1072 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074 let source = view.source();
1075 let mut result_table = DataTable::new("derived");
1076
1077 let visible_cols = view.visible_column_indices().to_vec();
1079
1080 for col_idx in &visible_cols {
1082 let col = &source.columns[*col_idx];
1083 let new_col = DataColumn {
1084 name: col.name.clone(),
1085 data_type: col.data_type.clone(),
1086 nullable: col.nullable,
1087 unique_values: col.unique_values,
1088 null_count: col.null_count,
1089 metadata: col.metadata.clone(),
1090 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1093 result_table.add_column(new_col);
1094 }
1095
1096 for row_idx in view.visible_row_indices() {
1098 let source_row = &source.rows[*row_idx];
1099 let mut new_row = DataRow { values: Vec::new() };
1100
1101 for col_idx in &visible_cols {
1102 new_row.values.push(source_row.values[*col_idx].clone());
1103 }
1104
1105 result_table.add_row(new_row);
1106 }
1107
1108 Ok(result_table)
1109 }
1110
1111 fn build_view_internal(
1112 &self,
1113 table: Arc<DataTable>,
1114 statement: SelectStatement,
1115 ) -> Result<DataView> {
1116 let mut dummy_plan = ExecutionPlanBuilder::new();
1117 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118 }
1119
1120 fn build_view_internal_with_plan(
1121 &self,
1122 table: Arc<DataTable>,
1123 statement: SelectStatement,
1124 plan: &mut ExecutionPlanBuilder,
1125 ) -> Result<DataView> {
1126 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127 }
1128
1129 fn build_view_internal_with_plan_and_exec(
1130 &self,
1131 table: Arc<DataTable>,
1132 statement: SelectStatement,
1133 plan: &mut ExecutionPlanBuilder,
1134 exec_context: Option<&ExecutionContext>,
1135 ) -> Result<DataView> {
1136 debug!(
1137 "QueryEngine::build_view - select_items: {:?}",
1138 statement.select_items
1139 );
1140 debug!(
1141 "QueryEngine::build_view - where_clause: {:?}",
1142 statement.where_clause
1143 );
1144
1145 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148 if let Some(where_clause) = &statement.where_clause {
1150 let total_rows = table.row_count();
1151 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155 plan.set_rows_in(total_rows);
1156 plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158 for condition in &where_clause.conditions {
1160 plan.add_detail(format!("Condition: {:?}", condition.expr));
1161 }
1162
1163 let filter_start = Instant::now();
1164 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167 let mut evaluator = if let Some(exec_ctx) = exec_context {
1169 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1171 } else {
1172 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1173 };
1174
1175 let mut filtered_rows = Vec::new();
1177 for row_idx in visible_rows {
1178 if row_idx < 3 {
1180 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1181 }
1182
1183 match evaluator.evaluate(where_clause, row_idx) {
1184 Ok(result) => {
1185 if row_idx < 3 {
1186 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1187 }
1188 if result {
1189 filtered_rows.push(row_idx);
1190 }
1191 }
1192 Err(e) => {
1193 if row_idx < 3 {
1194 debug!(
1195 "QueryEngine: WHERE evaluation error for row {}: {}",
1196 row_idx, e
1197 );
1198 }
1199 return Err(e);
1201 }
1202 }
1203 }
1204
1205 let (compilations, cache_hits) = eval_context.get_stats();
1207 if compilations > 0 || cache_hits > 0 {
1208 debug!(
1209 "LIKE pattern cache: {} compilations, {} cache hits",
1210 compilations, cache_hits
1211 );
1212 }
1213 visible_rows = filtered_rows;
1214 let filter_duration = filter_start.elapsed();
1215 info!(
1216 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1217 total_rows,
1218 visible_rows.len(),
1219 filter_duration
1220 );
1221
1222 plan.set_rows_out(visible_rows.len());
1223 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1224 plan.add_detail(format!(
1225 "Filter time: {:.3}ms",
1226 filter_duration.as_secs_f64() * 1000.0
1227 ));
1228 plan.end_step();
1229 }
1230
1231 let mut view = DataView::new(table.clone());
1233 view = view.with_rows(visible_rows);
1234
1235 if let Some(group_by_exprs) = &statement.group_by {
1237 if !group_by_exprs.is_empty() {
1238 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1239
1240 plan.begin_step(
1241 StepType::GroupBy,
1242 format!("GROUP BY {} expressions", group_by_exprs.len()),
1243 );
1244 plan.set_rows_in(view.row_count());
1245 plan.add_detail(format!("Input: {} rows", view.row_count()));
1246 for expr in group_by_exprs {
1247 plan.add_detail(format!("Group by: {:?}", expr));
1248 }
1249
1250 let group_start = Instant::now();
1251 view = self.apply_group_by(
1252 view,
1253 group_by_exprs,
1254 &statement.select_items,
1255 statement.having.as_ref(),
1256 plan,
1257 )?;
1258
1259 plan.set_rows_out(view.row_count());
1260 plan.add_detail(format!("Output: {} groups", view.row_count()));
1261 plan.add_detail(format!(
1262 "Overall time: {:.3}ms",
1263 group_start.elapsed().as_secs_f64() * 1000.0
1264 ));
1265 plan.end_step();
1266 }
1267 } else {
1268 if !statement.select_items.is_empty() {
1270 let has_non_star_items = statement
1272 .select_items
1273 .iter()
1274 .any(|item| !matches!(item, SelectItem::Star { .. }));
1275
1276 if has_non_star_items || statement.select_items.len() > 1 {
1280 view = self.apply_select_items(
1281 view,
1282 &statement.select_items,
1283 &statement,
1284 exec_context,
1285 )?;
1286 }
1287 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1289 debug!("QueryEngine: Using legacy columns path");
1290 let source_table = view.source();
1293 let column_indices =
1294 self.resolve_column_indices(source_table, &statement.columns)?;
1295 view = view.with_columns(column_indices);
1296 }
1297 }
1298
1299 if statement.distinct {
1301 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1302 plan.set_rows_in(view.row_count());
1303 plan.add_detail(format!("Input: {} rows", view.row_count()));
1304
1305 let distinct_start = Instant::now();
1306 view = self.apply_distinct(view)?;
1307
1308 plan.set_rows_out(view.row_count());
1309 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1310 plan.add_detail(format!(
1311 "Distinct time: {:.3}ms",
1312 distinct_start.elapsed().as_secs_f64() * 1000.0
1313 ));
1314 plan.end_step();
1315 }
1316
1317 if let Some(order_by_columns) = &statement.order_by {
1319 if !order_by_columns.is_empty() {
1320 plan.begin_step(
1321 StepType::Sort,
1322 format!("ORDER BY {} columns", order_by_columns.len()),
1323 );
1324 plan.set_rows_in(view.row_count());
1325 for col in order_by_columns {
1326 plan.add_detail(format!("{} {:?}", col.column, col.direction));
1327 }
1328
1329 let sort_start = Instant::now();
1330 view =
1331 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1332
1333 plan.add_detail(format!(
1334 "Sort time: {:.3}ms",
1335 sort_start.elapsed().as_secs_f64() * 1000.0
1336 ));
1337 plan.end_step();
1338 }
1339 }
1340
1341 if let Some(limit) = statement.limit {
1343 let offset = statement.offset.unwrap_or(0);
1344 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1345 plan.set_rows_in(view.row_count());
1346 if offset > 0 {
1347 plan.add_detail(format!("OFFSET: {}", offset));
1348 }
1349 view = view.with_limit(limit, offset);
1350 plan.set_rows_out(view.row_count());
1351 plan.add_detail(format!("Output: {} rows", view.row_count()));
1352 plan.end_step();
1353 }
1354
1355 if !statement.set_operations.is_empty() {
1357 plan.begin_step(
1358 StepType::SetOperation,
1359 format!("Process {} set operations", statement.set_operations.len()),
1360 );
1361 plan.set_rows_in(view.row_count());
1362
1363 let mut combined_table = self.materialize_view(view)?;
1365 let first_columns = combined_table.column_names();
1366 let first_column_count = first_columns.len();
1367
1368 let mut needs_deduplication = false;
1370
1371 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1373 let op_start = Instant::now();
1374 plan.begin_step(
1375 StepType::SetOperation,
1376 format!("{:?} operation #{}", operation, idx + 1),
1377 );
1378
1379 let next_view = if let Some(exec_ctx) = exec_context {
1382 self.build_view_internal_with_plan_and_exec(
1383 table.clone(),
1384 *next_statement.clone(),
1385 plan,
1386 Some(exec_ctx),
1387 )?
1388 } else {
1389 self.build_view_internal_with_plan(
1390 table.clone(),
1391 *next_statement.clone(),
1392 plan,
1393 )?
1394 };
1395
1396 let next_table = self.materialize_view(next_view)?;
1398 let next_columns = next_table.column_names();
1399 let next_column_count = next_columns.len();
1400
1401 if first_column_count != next_column_count {
1403 return Err(anyhow!(
1404 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1405 first_column_count,
1406 idx + 2,
1407 next_column_count
1408 ));
1409 }
1410
1411 for (col_idx, (first_col, next_col)) in
1413 first_columns.iter().zip(next_columns.iter()).enumerate()
1414 {
1415 if !first_col.eq_ignore_ascii_case(next_col) {
1416 debug!(
1417 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1418 col_idx + 1,
1419 first_col,
1420 next_col
1421 );
1422 }
1423 }
1424
1425 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1426 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1427
1428 match operation {
1430 SetOperation::UnionAll => {
1431 for row in next_table.rows.iter() {
1433 combined_table.add_row(row.clone());
1434 }
1435 plan.add_detail(format!(
1436 "Result: {} rows (no deduplication)",
1437 combined_table.row_count()
1438 ));
1439 }
1440 SetOperation::Union => {
1441 for row in next_table.rows.iter() {
1443 combined_table.add_row(row.clone());
1444 }
1445 needs_deduplication = true;
1446 plan.add_detail(format!(
1447 "Combined: {} rows (deduplication pending)",
1448 combined_table.row_count()
1449 ));
1450 }
1451 SetOperation::Intersect => {
1452 return Err(anyhow!("INTERSECT is not yet implemented"));
1455 }
1456 SetOperation::Except => {
1457 return Err(anyhow!("EXCEPT is not yet implemented"));
1460 }
1461 }
1462
1463 plan.add_detail(format!(
1464 "Operation time: {:.3}ms",
1465 op_start.elapsed().as_secs_f64() * 1000.0
1466 ));
1467 plan.set_rows_out(combined_table.row_count());
1468 plan.end_step();
1469 }
1470
1471 plan.set_rows_out(combined_table.row_count());
1472 plan.add_detail(format!(
1473 "Combined result: {} rows after {} operations",
1474 combined_table.row_count(),
1475 statement.set_operations.len()
1476 ));
1477 plan.end_step();
1478
1479 view = DataView::new(Arc::new(combined_table));
1481
1482 if needs_deduplication {
1484 plan.begin_step(
1485 StepType::Distinct,
1486 "UNION deduplication - remove duplicate rows".to_string(),
1487 );
1488 plan.set_rows_in(view.row_count());
1489 plan.add_detail(format!("Input: {} rows", view.row_count()));
1490
1491 let distinct_start = Instant::now();
1492 view = self.apply_distinct(view)?;
1493
1494 plan.set_rows_out(view.row_count());
1495 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1496 plan.add_detail(format!(
1497 "Deduplication time: {:.3}ms",
1498 distinct_start.elapsed().as_secs_f64() * 1000.0
1499 ));
1500 plan.end_step();
1501 }
1502 }
1503
1504 Ok(view)
1505 }
1506
1507 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1509 let mut indices = Vec::new();
1510 let table_columns = table.column_names();
1511
1512 for col_name in columns {
1513 let index = table_columns
1514 .iter()
1515 .position(|c| c.eq_ignore_ascii_case(col_name))
1516 .ok_or_else(|| {
1517 let suggestion = self.find_similar_column(table, col_name);
1518 match suggestion {
1519 Some(similar) => anyhow::anyhow!(
1520 "Column '{}' not found. Did you mean '{}'?",
1521 col_name,
1522 similar
1523 ),
1524 None => anyhow::anyhow!("Column '{}' not found", col_name),
1525 }
1526 })?;
1527 indices.push(index);
1528 }
1529
1530 Ok(indices)
1531 }
1532
1533 fn apply_select_items(
1535 &self,
1536 view: DataView,
1537 select_items: &[SelectItem],
1538 _statement: &SelectStatement,
1539 exec_context: Option<&ExecutionContext>,
1540 ) -> Result<DataView> {
1541 debug!(
1542 "QueryEngine::apply_select_items - items: {:?}",
1543 select_items
1544 );
1545 debug!(
1546 "QueryEngine::apply_select_items - input view has {} rows",
1547 view.row_count()
1548 );
1549
1550 let has_unnest = select_items.iter().any(|item| match item {
1552 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1553 _ => false,
1554 });
1555
1556 if has_unnest {
1557 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1558 return self.apply_select_with_row_expansion(view, select_items);
1559 }
1560
1561 let has_aggregates = select_items.iter().any(|item| match item {
1565 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1566 SelectItem::Column { .. } => false,
1567 SelectItem::Star { .. } => false,
1568 });
1569
1570 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1571 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1572 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, });
1575
1576 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1577 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1580 return self.apply_aggregate_select(view, select_items);
1581 }
1582
1583 let has_computed_expressions = select_items
1585 .iter()
1586 .any(|item| matches!(item, SelectItem::Expression { .. }));
1587
1588 debug!(
1589 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1590 has_computed_expressions
1591 );
1592
1593 if !has_computed_expressions {
1594 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1596 return Ok(view.with_columns(column_indices));
1597 }
1598
1599 let source_table = view.source();
1604 let visible_rows = view.visible_row_indices();
1605
1606 let mut computed_table = DataTable::new("query_result");
1609
1610 let mut expanded_items = Vec::new();
1612 for item in select_items {
1613 match item {
1614 SelectItem::Star { table_prefix, .. } => {
1615 if let Some(prefix) = table_prefix {
1616 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
1618 for col in &source_table.columns {
1619 if Self::column_matches_table(col, prefix) {
1620 expanded_items.push(SelectItem::Column {
1621 column: ColumnRef::unquoted(col.name.clone()),
1622 leading_comments: vec![],
1623 trailing_comment: None,
1624 });
1625 }
1626 }
1627 } else {
1628 debug!("QueryEngine::apply_select_items - expanding *");
1630 for col_name in source_table.column_names() {
1631 expanded_items.push(SelectItem::Column {
1632 column: ColumnRef::unquoted(col_name.to_string()),
1633 leading_comments: vec![],
1634 trailing_comment: None,
1635 });
1636 }
1637 }
1638 }
1639 _ => expanded_items.push(item.clone()),
1640 }
1641 }
1642
1643 let mut column_name_counts: std::collections::HashMap<String, usize> =
1645 std::collections::HashMap::new();
1646
1647 for item in &expanded_items {
1648 let base_name = match item {
1649 SelectItem::Column {
1650 column: col_ref, ..
1651 } => col_ref.name.clone(),
1652 SelectItem::Expression { alias, .. } => alias.clone(),
1653 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1654 };
1655
1656 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1658 let column_name = if *count == 0 {
1659 base_name.clone()
1661 } else {
1662 format!("{base_name}_{count}")
1664 };
1665 *count += 1;
1666
1667 computed_table.add_column(DataColumn::new(&column_name));
1668 }
1669
1670 let mut evaluator =
1672 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1673
1674 if let Some(exec_ctx) = exec_context {
1676 let aliases = exec_ctx.get_aliases();
1677 if !aliases.is_empty() {
1678 debug!(
1679 "Applying {} aliases to evaluator: {:?}",
1680 aliases.len(),
1681 aliases
1682 );
1683 evaluator = evaluator.with_table_aliases(aliases);
1684 }
1685 }
1686
1687 for &row_idx in visible_rows {
1688 let mut row_values = Vec::new();
1689
1690 for item in &expanded_items {
1691 let value = match item {
1692 SelectItem::Column {
1693 column: col_ref, ..
1694 } => {
1695 match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1697 Ok(val) => val,
1698 Err(e) => {
1699 return Err(anyhow!(
1700 "Failed to evaluate column {}: {}",
1701 col_ref.to_sql(),
1702 e
1703 ));
1704 }
1705 }
1706 }
1707 SelectItem::Expression { expr, .. } => {
1708 evaluator.evaluate(&expr, row_idx)?
1710 }
1711 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1712 };
1713 row_values.push(value);
1714 }
1715
1716 computed_table
1717 .add_row(DataRow::new(row_values))
1718 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1719 }
1720
1721 Ok(DataView::new(Arc::new(computed_table)))
1724 }
1725
1726 fn apply_select_with_row_expansion(
1728 &self,
1729 view: DataView,
1730 select_items: &[SelectItem],
1731 ) -> Result<DataView> {
1732 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1733
1734 let source_table = view.source();
1735 let visible_rows = view.visible_row_indices();
1736 let expander_registry = RowExpanderRegistry::new();
1737
1738 let mut result_table = DataTable::new("unnest_result");
1740
1741 let mut expanded_items = Vec::new();
1743 for item in select_items {
1744 match item {
1745 SelectItem::Star { table_prefix, .. } => {
1746 if let Some(prefix) = table_prefix {
1747 debug!(
1749 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
1750 prefix
1751 );
1752 for col in &source_table.columns {
1753 if Self::column_matches_table(col, prefix) {
1754 expanded_items.push(SelectItem::Column {
1755 column: ColumnRef::unquoted(col.name.clone()),
1756 leading_comments: vec![],
1757 trailing_comment: None,
1758 });
1759 }
1760 }
1761 } else {
1762 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
1764 for col_name in source_table.column_names() {
1765 expanded_items.push(SelectItem::Column {
1766 column: ColumnRef::unquoted(col_name.to_string()),
1767 leading_comments: vec![],
1768 trailing_comment: None,
1769 });
1770 }
1771 }
1772 }
1773 _ => expanded_items.push(item.clone()),
1774 }
1775 }
1776
1777 for item in &expanded_items {
1779 let column_name = match item {
1780 SelectItem::Column {
1781 column: col_ref, ..
1782 } => col_ref.name.clone(),
1783 SelectItem::Expression { alias, .. } => alias.clone(),
1784 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1785 };
1786 result_table.add_column(DataColumn::new(&column_name));
1787 }
1788
1789 let mut evaluator =
1791 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1792
1793 for &row_idx in visible_rows {
1794 let mut unnest_expansions = Vec::new();
1796 let mut unnest_indices = Vec::new();
1797
1798 for (col_idx, item) in expanded_items.iter().enumerate() {
1799 if let SelectItem::Expression { expr, .. } = item {
1800 if let Some(expansion_result) = self.try_expand_unnest(
1801 &expr,
1802 source_table,
1803 row_idx,
1804 &mut evaluator,
1805 &expander_registry,
1806 )? {
1807 unnest_expansions.push(expansion_result);
1808 unnest_indices.push(col_idx);
1809 }
1810 }
1811 }
1812
1813 let expansion_count = if unnest_expansions.is_empty() {
1815 1 } else {
1817 unnest_expansions
1818 .iter()
1819 .map(|exp| exp.row_count())
1820 .max()
1821 .unwrap_or(1)
1822 };
1823
1824 for output_idx in 0..expansion_count {
1826 let mut row_values = Vec::new();
1827
1828 for (col_idx, item) in expanded_items.iter().enumerate() {
1829 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1831
1832 let value = if let Some(unnest_idx) = unnest_position {
1833 let expansion = &unnest_expansions[unnest_idx];
1835 expansion
1836 .values
1837 .get(output_idx)
1838 .cloned()
1839 .unwrap_or(DataValue::Null)
1840 } else {
1841 match item {
1843 SelectItem::Column {
1844 column: col_ref, ..
1845 } => {
1846 let col_idx =
1847 source_table.get_column_index(&col_ref.name).ok_or_else(
1848 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1849 )?;
1850 let row = source_table
1851 .get_row(row_idx)
1852 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1853 row.get(col_idx)
1854 .ok_or_else(|| {
1855 anyhow::anyhow!("Column {} not found in row", col_idx)
1856 })?
1857 .clone()
1858 }
1859 SelectItem::Expression { expr, .. } => {
1860 evaluator.evaluate(&expr, row_idx)?
1862 }
1863 SelectItem::Star { .. } => unreachable!(),
1864 }
1865 };
1866
1867 row_values.push(value);
1868 }
1869
1870 result_table
1871 .add_row(DataRow::new(row_values))
1872 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1873 }
1874 }
1875
1876 debug!(
1877 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1878 visible_rows.len(),
1879 result_table.row_count()
1880 );
1881
1882 Ok(DataView::new(Arc::new(result_table)))
1883 }
1884
1885 fn try_expand_unnest(
1888 &self,
1889 expr: &SqlExpression,
1890 _source_table: &DataTable,
1891 row_idx: usize,
1892 evaluator: &mut ArithmeticEvaluator,
1893 expander_registry: &RowExpanderRegistry,
1894 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1895 if let SqlExpression::Unnest { column, delimiter } = expr {
1897 let column_value = evaluator.evaluate(column, row_idx)?;
1899
1900 let delimiter_value = DataValue::String(delimiter.clone());
1902
1903 let expander = expander_registry
1905 .get("UNNEST")
1906 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1907
1908 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1910 return Ok(Some(expansion));
1911 }
1912
1913 if let SqlExpression::FunctionCall { name, args, .. } = expr {
1915 if name.to_uppercase() == "UNNEST" {
1916 if args.len() != 2 {
1918 return Err(anyhow::anyhow!(
1919 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1920 ));
1921 }
1922
1923 let column_value = evaluator.evaluate(&args[0], row_idx)?;
1925
1926 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1928
1929 let expander = expander_registry
1931 .get("UNNEST")
1932 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1933
1934 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1936 return Ok(Some(expansion));
1937 }
1938 }
1939
1940 Ok(None)
1941 }
1942
1943 fn apply_aggregate_select(
1945 &self,
1946 view: DataView,
1947 select_items: &[SelectItem],
1948 ) -> Result<DataView> {
1949 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1950
1951 let source_table = view.source();
1952 let mut result_table = DataTable::new("aggregate_result");
1953
1954 for item in select_items {
1956 let column_name = match item {
1957 SelectItem::Expression { alias, .. } => alias.clone(),
1958 _ => unreachable!("Should only have expressions in aggregate-only query"),
1959 };
1960 result_table.add_column(DataColumn::new(&column_name));
1961 }
1962
1963 let visible_rows = view.visible_row_indices().to_vec();
1965 let mut evaluator =
1966 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1967 .with_visible_rows(visible_rows);
1968
1969 let mut row_values = Vec::new();
1971 for item in select_items {
1972 match item {
1973 SelectItem::Expression { expr, .. } => {
1974 let value = evaluator.evaluate(expr, 0)?;
1977 row_values.push(value);
1978 }
1979 _ => unreachable!("Should only have expressions in aggregate-only query"),
1980 }
1981 }
1982
1983 result_table
1985 .add_row(DataRow::new(row_values))
1986 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1987
1988 Ok(DataView::new(Arc::new(result_table)))
1989 }
1990
1991 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2003 if let Some(ref source) = col.source_table {
2005 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2007 return true;
2008 }
2009 }
2010
2011 if let Some(ref qualified) = col.qualified_name {
2013 if qualified.starts_with(&format!("{}.", table_name)) {
2015 return true;
2016 }
2017 }
2018
2019 false
2020 }
2021
2022 fn resolve_select_columns(
2024 &self,
2025 table: &DataTable,
2026 select_items: &[SelectItem],
2027 ) -> Result<Vec<usize>> {
2028 let mut indices = Vec::new();
2029 let table_columns = table.column_names();
2030
2031 for item in select_items {
2032 match item {
2033 SelectItem::Column {
2034 column: col_ref, ..
2035 } => {
2036 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2038 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2040 table.find_column_by_qualified_name(&qualified_name)
2041 .ok_or_else(|| {
2042 let has_qualified = table.columns.iter()
2044 .any(|c| c.qualified_name.is_some());
2045 if !has_qualified {
2046 anyhow::anyhow!(
2047 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2048 qualified_name, table_prefix
2049 )
2050 } else {
2051 anyhow::anyhow!("Column '{}' not found", qualified_name)
2052 }
2053 })?
2054 } else {
2055 table_columns
2057 .iter()
2058 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2059 .ok_or_else(|| {
2060 let suggestion = self.find_similar_column(table, &col_ref.name);
2061 match suggestion {
2062 Some(similar) => anyhow::anyhow!(
2063 "Column '{}' not found. Did you mean '{}'?",
2064 col_ref.name,
2065 similar
2066 ),
2067 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2068 }
2069 })?
2070 };
2071 indices.push(index);
2072 }
2073 SelectItem::Star { table_prefix, .. } => {
2074 if let Some(prefix) = table_prefix {
2075 for (i, col) in table.columns.iter().enumerate() {
2077 if Self::column_matches_table(col, prefix) {
2078 indices.push(i);
2079 }
2080 }
2081 } else {
2082 for i in 0..table_columns.len() {
2084 indices.push(i);
2085 }
2086 }
2087 }
2088 SelectItem::Expression { .. } => {
2089 return Err(anyhow::anyhow!(
2090 "Computed expressions require new table creation"
2091 ));
2092 }
2093 }
2094 }
2095
2096 Ok(indices)
2097 }
2098
2099 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2101 use std::collections::HashSet;
2102
2103 let source = view.source();
2104 let visible_cols = view.visible_column_indices();
2105 let visible_rows = view.visible_row_indices();
2106
2107 let mut seen_rows = HashSet::new();
2109 let mut unique_row_indices = Vec::new();
2110
2111 for &row_idx in visible_rows {
2112 let mut row_key = Vec::new();
2114 for &col_idx in visible_cols {
2115 let value = source
2116 .get_value(row_idx, col_idx)
2117 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2118 row_key.push(format!("{:?}", value));
2120 }
2121
2122 if seen_rows.insert(row_key) {
2124 unique_row_indices.push(row_idx);
2126 }
2127 }
2128
2129 Ok(view.with_rows(unique_row_indices))
2131 }
2132
2133 fn apply_multi_order_by(
2135 &self,
2136 view: DataView,
2137 order_by_columns: &[OrderByColumn],
2138 ) -> Result<DataView> {
2139 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2140 }
2141
2142 fn apply_multi_order_by_with_context(
2144 &self,
2145 mut view: DataView,
2146 order_by_columns: &[OrderByColumn],
2147 _exec_context: Option<&ExecutionContext>,
2148 ) -> Result<DataView> {
2149 let mut sort_columns = Vec::new();
2151
2152 for order_col in order_by_columns {
2153 let col_index = if order_col.column.contains('.') {
2155 if let Some(dot_pos) = order_col.column.rfind('.') {
2157 let col_name = &order_col.column[dot_pos + 1..];
2158
2159 debug!(
2162 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2163 col_name, order_col.column
2164 );
2165 view.source().get_column_index(col_name)
2166 } else {
2167 view.source().get_column_index(&order_col.column)
2168 }
2169 } else {
2170 view.source().get_column_index(&order_col.column)
2172 }
2173 .ok_or_else(|| {
2174 let suggestion = self.find_similar_column(view.source(), &order_col.column);
2176 match suggestion {
2177 Some(similar) => anyhow::anyhow!(
2178 "Column '{}' not found. Did you mean '{}'?",
2179 order_col.column,
2180 similar
2181 ),
2182 None => {
2183 let available_cols = view.source().column_names().join(", ");
2185 anyhow::anyhow!(
2186 "Column '{}' not found. Available columns: {}",
2187 order_col.column,
2188 available_cols
2189 )
2190 }
2191 }
2192 })?;
2193
2194 let ascending = matches!(order_col.direction, SortDirection::Asc);
2195 sort_columns.push((col_index, ascending));
2196 }
2197
2198 view.apply_multi_sort(&sort_columns)?;
2200 Ok(view)
2201 }
2202
2203 fn apply_group_by(
2205 &self,
2206 view: DataView,
2207 group_by_exprs: &[SqlExpression],
2208 select_items: &[SelectItem],
2209 having: Option<&SqlExpression>,
2210 plan: &mut ExecutionPlanBuilder,
2211 ) -> Result<DataView> {
2212 let (result_view, phase_info) = self.apply_group_by_expressions(
2214 view,
2215 group_by_exprs,
2216 select_items,
2217 having,
2218 self.case_insensitive,
2219 self.date_notation.clone(),
2220 )?;
2221
2222 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2224 plan.add_detail(format!(
2225 "Phase 1 - Group Building: {:.3}ms",
2226 phase_info.phase2_key_building.as_secs_f64() * 1000.0
2227 ));
2228 plan.add_detail(format!(
2229 " • Processing {} rows into {} groups",
2230 phase_info.total_rows, phase_info.num_groups
2231 ));
2232 plan.add_detail(format!(
2233 "Phase 2 - Aggregation: {:.3}ms",
2234 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2235 ));
2236 if phase_info.phase4_having_evaluation > Duration::ZERO {
2237 plan.add_detail(format!(
2238 "Phase 3 - HAVING Filter: {:.3}ms",
2239 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2240 ));
2241 plan.add_detail(format!(
2242 " • Filtered {} groups",
2243 phase_info.groups_filtered_by_having
2244 ));
2245 }
2246 plan.add_detail(format!(
2247 "Total GROUP BY time: {:.3}ms",
2248 phase_info.total_time.as_secs_f64() * 1000.0
2249 ));
2250
2251 Ok(result_view)
2252 }
2253
2254 pub fn estimate_group_cardinality(
2257 &self,
2258 view: &DataView,
2259 group_by_exprs: &[SqlExpression],
2260 ) -> usize {
2261 let row_count = view.get_visible_rows().len();
2263 if row_count <= 100 {
2264 return row_count;
2265 }
2266
2267 let sample_size = min(1000, row_count / 10).max(100);
2269 let mut seen = FxHashSet::default();
2270
2271 let visible_rows = view.get_visible_rows();
2272 for (i, &row_idx) in visible_rows.iter().enumerate() {
2273 if i >= sample_size {
2274 break;
2275 }
2276
2277 let mut key_values = Vec::new();
2279 for expr in group_by_exprs {
2280 let mut evaluator = ArithmeticEvaluator::new(view.source());
2281 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2282 key_values.push(value);
2283 }
2284
2285 seen.insert(key_values);
2286 }
2287
2288 let sample_cardinality = seen.len();
2290 let estimated = (sample_cardinality * row_count) / sample_size;
2291
2292 estimated.min(row_count).max(sample_cardinality)
2294 }
2295}
2296
2297#[cfg(test)]
2298mod tests {
2299 use super::*;
2300 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2301
2302 fn create_test_table() -> Arc<DataTable> {
2303 let mut table = DataTable::new("test");
2304
2305 table.add_column(DataColumn::new("id"));
2307 table.add_column(DataColumn::new("name"));
2308 table.add_column(DataColumn::new("age"));
2309
2310 table
2312 .add_row(DataRow::new(vec![
2313 DataValue::Integer(1),
2314 DataValue::String("Alice".to_string()),
2315 DataValue::Integer(30),
2316 ]))
2317 .unwrap();
2318
2319 table
2320 .add_row(DataRow::new(vec![
2321 DataValue::Integer(2),
2322 DataValue::String("Bob".to_string()),
2323 DataValue::Integer(25),
2324 ]))
2325 .unwrap();
2326
2327 table
2328 .add_row(DataRow::new(vec![
2329 DataValue::Integer(3),
2330 DataValue::String("Charlie".to_string()),
2331 DataValue::Integer(35),
2332 ]))
2333 .unwrap();
2334
2335 Arc::new(table)
2336 }
2337
2338 #[test]
2339 fn test_select_all() {
2340 let table = create_test_table();
2341 let engine = QueryEngine::new();
2342
2343 let view = engine
2344 .execute(table.clone(), "SELECT * FROM users")
2345 .unwrap();
2346 assert_eq!(view.row_count(), 3);
2347 assert_eq!(view.column_count(), 3);
2348 }
2349
2350 #[test]
2351 fn test_select_columns() {
2352 let table = create_test_table();
2353 let engine = QueryEngine::new();
2354
2355 let view = engine
2356 .execute(table.clone(), "SELECT name, age FROM users")
2357 .unwrap();
2358 assert_eq!(view.row_count(), 3);
2359 assert_eq!(view.column_count(), 2);
2360 }
2361
2362 #[test]
2363 fn test_select_with_limit() {
2364 let table = create_test_table();
2365 let engine = QueryEngine::new();
2366
2367 let view = engine
2368 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2369 .unwrap();
2370 assert_eq!(view.row_count(), 2);
2371 }
2372
2373 #[test]
2374 fn test_type_coercion_contains() {
2375 let _ = tracing_subscriber::fmt()
2377 .with_max_level(tracing::Level::DEBUG)
2378 .try_init();
2379
2380 let mut table = DataTable::new("test");
2381 table.add_column(DataColumn::new("id"));
2382 table.add_column(DataColumn::new("status"));
2383 table.add_column(DataColumn::new("price"));
2384
2385 table
2387 .add_row(DataRow::new(vec![
2388 DataValue::Integer(1),
2389 DataValue::String("Pending".to_string()),
2390 DataValue::Float(99.99),
2391 ]))
2392 .unwrap();
2393
2394 table
2395 .add_row(DataRow::new(vec![
2396 DataValue::Integer(2),
2397 DataValue::String("Confirmed".to_string()),
2398 DataValue::Float(150.50),
2399 ]))
2400 .unwrap();
2401
2402 table
2403 .add_row(DataRow::new(vec![
2404 DataValue::Integer(3),
2405 DataValue::String("Pending".to_string()),
2406 DataValue::Float(75.00),
2407 ]))
2408 .unwrap();
2409
2410 let table = Arc::new(table);
2411 let engine = QueryEngine::new();
2412
2413 println!("\n=== Testing WHERE clause with Contains ===");
2414 println!("Table has {} rows", table.row_count());
2415 for i in 0..table.row_count() {
2416 let status = table.get_value(i, 1);
2417 println!("Row {i}: status = {status:?}");
2418 }
2419
2420 println!("\n--- Test 1: status.Contains('pend') ---");
2422 let result = engine.execute(
2423 table.clone(),
2424 "SELECT * FROM test WHERE status.Contains('pend')",
2425 );
2426 match result {
2427 Ok(view) => {
2428 println!("SUCCESS: Found {} matching rows", view.row_count());
2429 assert_eq!(view.row_count(), 2); }
2431 Err(e) => {
2432 panic!("Query failed: {e}");
2433 }
2434 }
2435
2436 println!("\n--- Test 2: price.Contains('9') ---");
2438 let result = engine.execute(
2439 table.clone(),
2440 "SELECT * FROM test WHERE price.Contains('9')",
2441 );
2442 match result {
2443 Ok(view) => {
2444 println!(
2445 "SUCCESS: Found {} matching rows with price containing '9'",
2446 view.row_count()
2447 );
2448 assert!(view.row_count() >= 1);
2450 }
2451 Err(e) => {
2452 panic!("Numeric coercion query failed: {e}");
2453 }
2454 }
2455
2456 println!("\n=== All tests passed! ===");
2457 }
2458
2459 #[test]
2460 fn test_not_in_clause() {
2461 let _ = tracing_subscriber::fmt()
2463 .with_max_level(tracing::Level::DEBUG)
2464 .try_init();
2465
2466 let mut table = DataTable::new("test");
2467 table.add_column(DataColumn::new("id"));
2468 table.add_column(DataColumn::new("country"));
2469
2470 table
2472 .add_row(DataRow::new(vec![
2473 DataValue::Integer(1),
2474 DataValue::String("CA".to_string()),
2475 ]))
2476 .unwrap();
2477
2478 table
2479 .add_row(DataRow::new(vec![
2480 DataValue::Integer(2),
2481 DataValue::String("US".to_string()),
2482 ]))
2483 .unwrap();
2484
2485 table
2486 .add_row(DataRow::new(vec![
2487 DataValue::Integer(3),
2488 DataValue::String("UK".to_string()),
2489 ]))
2490 .unwrap();
2491
2492 let table = Arc::new(table);
2493 let engine = QueryEngine::new();
2494
2495 println!("\n=== Testing NOT IN clause ===");
2496 println!("Table has {} rows", table.row_count());
2497 for i in 0..table.row_count() {
2498 let country = table.get_value(i, 1);
2499 println!("Row {i}: country = {country:?}");
2500 }
2501
2502 println!("\n--- Test: country NOT IN ('CA') ---");
2504 let result = engine.execute(
2505 table.clone(),
2506 "SELECT * FROM test WHERE country NOT IN ('CA')",
2507 );
2508 match result {
2509 Ok(view) => {
2510 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2511 assert_eq!(view.row_count(), 2); }
2513 Err(e) => {
2514 panic!("NOT IN query failed: {e}");
2515 }
2516 }
2517
2518 println!("\n=== NOT IN test complete! ===");
2519 }
2520
2521 #[test]
2522 fn test_case_insensitive_in_and_not_in() {
2523 let _ = tracing_subscriber::fmt()
2525 .with_max_level(tracing::Level::DEBUG)
2526 .try_init();
2527
2528 let mut table = DataTable::new("test");
2529 table.add_column(DataColumn::new("id"));
2530 table.add_column(DataColumn::new("country"));
2531
2532 table
2534 .add_row(DataRow::new(vec![
2535 DataValue::Integer(1),
2536 DataValue::String("CA".to_string()), ]))
2538 .unwrap();
2539
2540 table
2541 .add_row(DataRow::new(vec![
2542 DataValue::Integer(2),
2543 DataValue::String("us".to_string()), ]))
2545 .unwrap();
2546
2547 table
2548 .add_row(DataRow::new(vec![
2549 DataValue::Integer(3),
2550 DataValue::String("UK".to_string()), ]))
2552 .unwrap();
2553
2554 let table = Arc::new(table);
2555
2556 println!("\n=== Testing Case-Insensitive IN clause ===");
2557 println!("Table has {} rows", table.row_count());
2558 for i in 0..table.row_count() {
2559 let country = table.get_value(i, 1);
2560 println!("Row {i}: country = {country:?}");
2561 }
2562
2563 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2565 let engine = QueryEngine::with_case_insensitive(true);
2566 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2567 match result {
2568 Ok(view) => {
2569 println!(
2570 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2571 view.row_count()
2572 );
2573 assert_eq!(view.row_count(), 1); }
2575 Err(e) => {
2576 panic!("Case-insensitive IN query failed: {e}");
2577 }
2578 }
2579
2580 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2582 let result = engine.execute(
2583 table.clone(),
2584 "SELECT * FROM test WHERE country NOT IN ('ca')",
2585 );
2586 match result {
2587 Ok(view) => {
2588 println!(
2589 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2590 view.row_count()
2591 );
2592 assert_eq!(view.row_count(), 2); }
2594 Err(e) => {
2595 panic!("Case-insensitive NOT IN query failed: {e}");
2596 }
2597 }
2598
2599 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2601 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
2603 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2604 match result {
2605 Ok(view) => {
2606 println!(
2607 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2608 view.row_count()
2609 );
2610 assert_eq!(view.row_count(), 0); }
2612 Err(e) => {
2613 panic!("Case-sensitive IN query failed: {e}");
2614 }
2615 }
2616
2617 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2618 }
2619
2620 #[test]
2621 #[ignore = "Parentheses in WHERE clause not yet implemented"]
2622 fn test_parentheses_in_where_clause() {
2623 let _ = tracing_subscriber::fmt()
2625 .with_max_level(tracing::Level::DEBUG)
2626 .try_init();
2627
2628 let mut table = DataTable::new("test");
2629 table.add_column(DataColumn::new("id"));
2630 table.add_column(DataColumn::new("status"));
2631 table.add_column(DataColumn::new("priority"));
2632
2633 table
2635 .add_row(DataRow::new(vec![
2636 DataValue::Integer(1),
2637 DataValue::String("Pending".to_string()),
2638 DataValue::String("High".to_string()),
2639 ]))
2640 .unwrap();
2641
2642 table
2643 .add_row(DataRow::new(vec![
2644 DataValue::Integer(2),
2645 DataValue::String("Complete".to_string()),
2646 DataValue::String("High".to_string()),
2647 ]))
2648 .unwrap();
2649
2650 table
2651 .add_row(DataRow::new(vec![
2652 DataValue::Integer(3),
2653 DataValue::String("Pending".to_string()),
2654 DataValue::String("Low".to_string()),
2655 ]))
2656 .unwrap();
2657
2658 table
2659 .add_row(DataRow::new(vec![
2660 DataValue::Integer(4),
2661 DataValue::String("Complete".to_string()),
2662 DataValue::String("Low".to_string()),
2663 ]))
2664 .unwrap();
2665
2666 let table = Arc::new(table);
2667 let engine = QueryEngine::new();
2668
2669 println!("\n=== Testing Parentheses in WHERE clause ===");
2670 println!("Table has {} rows", table.row_count());
2671 for i in 0..table.row_count() {
2672 let status = table.get_value(i, 1);
2673 let priority = table.get_value(i, 2);
2674 println!("Row {i}: status = {status:?}, priority = {priority:?}");
2675 }
2676
2677 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2679 let result = engine.execute(
2680 table.clone(),
2681 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2682 );
2683 match result {
2684 Ok(view) => {
2685 println!(
2686 "SUCCESS: Found {} rows with parenthetical logic",
2687 view.row_count()
2688 );
2689 assert_eq!(view.row_count(), 2); }
2691 Err(e) => {
2692 panic!("Parentheses query failed: {e}");
2693 }
2694 }
2695
2696 println!("\n=== Parentheses test complete! ===");
2697 }
2698
2699 #[test]
2700 #[ignore = "Numeric type coercion needs fixing"]
2701 fn test_numeric_type_coercion() {
2702 let _ = tracing_subscriber::fmt()
2704 .with_max_level(tracing::Level::DEBUG)
2705 .try_init();
2706
2707 let mut table = DataTable::new("test");
2708 table.add_column(DataColumn::new("id"));
2709 table.add_column(DataColumn::new("price"));
2710 table.add_column(DataColumn::new("quantity"));
2711
2712 table
2714 .add_row(DataRow::new(vec![
2715 DataValue::Integer(1),
2716 DataValue::Float(99.50), DataValue::Integer(100),
2718 ]))
2719 .unwrap();
2720
2721 table
2722 .add_row(DataRow::new(vec![
2723 DataValue::Integer(2),
2724 DataValue::Float(150.0), DataValue::Integer(200),
2726 ]))
2727 .unwrap();
2728
2729 table
2730 .add_row(DataRow::new(vec![
2731 DataValue::Integer(3),
2732 DataValue::Integer(75), DataValue::Integer(50),
2734 ]))
2735 .unwrap();
2736
2737 let table = Arc::new(table);
2738 let engine = QueryEngine::new();
2739
2740 println!("\n=== Testing Numeric Type Coercion ===");
2741 println!("Table has {} rows", table.row_count());
2742 for i in 0..table.row_count() {
2743 let price = table.get_value(i, 1);
2744 let quantity = table.get_value(i, 2);
2745 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2746 }
2747
2748 println!("\n--- Test: price.Contains('.') ---");
2750 let result = engine.execute(
2751 table.clone(),
2752 "SELECT * FROM test WHERE price.Contains('.')",
2753 );
2754 match result {
2755 Ok(view) => {
2756 println!(
2757 "SUCCESS: Found {} rows with decimal points in price",
2758 view.row_count()
2759 );
2760 assert_eq!(view.row_count(), 2); }
2762 Err(e) => {
2763 panic!("Numeric Contains query failed: {e}");
2764 }
2765 }
2766
2767 println!("\n--- Test: quantity.Contains('0') ---");
2769 let result = engine.execute(
2770 table.clone(),
2771 "SELECT * FROM test WHERE quantity.Contains('0')",
2772 );
2773 match result {
2774 Ok(view) => {
2775 println!(
2776 "SUCCESS: Found {} rows with '0' in quantity",
2777 view.row_count()
2778 );
2779 assert_eq!(view.row_count(), 2); }
2781 Err(e) => {
2782 panic!("Integer Contains query failed: {e}");
2783 }
2784 }
2785
2786 println!("\n=== Numeric type coercion test complete! ===");
2787 }
2788
2789 #[test]
2790 fn test_datetime_comparisons() {
2791 let _ = tracing_subscriber::fmt()
2793 .with_max_level(tracing::Level::DEBUG)
2794 .try_init();
2795
2796 let mut table = DataTable::new("test");
2797 table.add_column(DataColumn::new("id"));
2798 table.add_column(DataColumn::new("created_date"));
2799
2800 table
2802 .add_row(DataRow::new(vec![
2803 DataValue::Integer(1),
2804 DataValue::String("2024-12-15".to_string()),
2805 ]))
2806 .unwrap();
2807
2808 table
2809 .add_row(DataRow::new(vec![
2810 DataValue::Integer(2),
2811 DataValue::String("2025-01-15".to_string()),
2812 ]))
2813 .unwrap();
2814
2815 table
2816 .add_row(DataRow::new(vec![
2817 DataValue::Integer(3),
2818 DataValue::String("2025-02-15".to_string()),
2819 ]))
2820 .unwrap();
2821
2822 let table = Arc::new(table);
2823 let engine = QueryEngine::new();
2824
2825 println!("\n=== Testing DateTime Comparisons ===");
2826 println!("Table has {} rows", table.row_count());
2827 for i in 0..table.row_count() {
2828 let date = table.get_value(i, 1);
2829 println!("Row {i}: created_date = {date:?}");
2830 }
2831
2832 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2834 let result = engine.execute(
2835 table.clone(),
2836 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2837 );
2838 match result {
2839 Ok(view) => {
2840 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2841 assert_eq!(view.row_count(), 2); }
2843 Err(e) => {
2844 panic!("DateTime comparison query failed: {e}");
2845 }
2846 }
2847
2848 println!("\n=== DateTime comparison test complete! ===");
2849 }
2850
2851 #[test]
2852 fn test_not_with_method_calls() {
2853 let _ = tracing_subscriber::fmt()
2855 .with_max_level(tracing::Level::DEBUG)
2856 .try_init();
2857
2858 let mut table = DataTable::new("test");
2859 table.add_column(DataColumn::new("id"));
2860 table.add_column(DataColumn::new("status"));
2861
2862 table
2864 .add_row(DataRow::new(vec![
2865 DataValue::Integer(1),
2866 DataValue::String("Pending Review".to_string()),
2867 ]))
2868 .unwrap();
2869
2870 table
2871 .add_row(DataRow::new(vec![
2872 DataValue::Integer(2),
2873 DataValue::String("Complete".to_string()),
2874 ]))
2875 .unwrap();
2876
2877 table
2878 .add_row(DataRow::new(vec![
2879 DataValue::Integer(3),
2880 DataValue::String("Pending Approval".to_string()),
2881 ]))
2882 .unwrap();
2883
2884 let table = Arc::new(table);
2885 let engine = QueryEngine::with_case_insensitive(true);
2886
2887 println!("\n=== Testing NOT with Method Calls ===");
2888 println!("Table has {} rows", table.row_count());
2889 for i in 0..table.row_count() {
2890 let status = table.get_value(i, 1);
2891 println!("Row {i}: status = {status:?}");
2892 }
2893
2894 println!("\n--- Test: NOT status.Contains('pend') ---");
2896 let result = engine.execute(
2897 table.clone(),
2898 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2899 );
2900 match result {
2901 Ok(view) => {
2902 println!(
2903 "SUCCESS: Found {} rows NOT containing 'pend'",
2904 view.row_count()
2905 );
2906 assert_eq!(view.row_count(), 1); }
2908 Err(e) => {
2909 panic!("NOT Contains query failed: {e}");
2910 }
2911 }
2912
2913 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2915 let result = engine.execute(
2916 table.clone(),
2917 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2918 );
2919 match result {
2920 Ok(view) => {
2921 println!(
2922 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2923 view.row_count()
2924 );
2925 assert_eq!(view.row_count(), 1); }
2927 Err(e) => {
2928 panic!("NOT StartsWith query failed: {e}");
2929 }
2930 }
2931
2932 println!("\n=== NOT with method calls test complete! ===");
2933 }
2934
2935 #[test]
2936 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2937 fn test_complex_logical_expressions() {
2938 let _ = tracing_subscriber::fmt()
2940 .with_max_level(tracing::Level::DEBUG)
2941 .try_init();
2942
2943 let mut table = DataTable::new("test");
2944 table.add_column(DataColumn::new("id"));
2945 table.add_column(DataColumn::new("status"));
2946 table.add_column(DataColumn::new("priority"));
2947 table.add_column(DataColumn::new("assigned"));
2948
2949 table
2951 .add_row(DataRow::new(vec![
2952 DataValue::Integer(1),
2953 DataValue::String("Pending".to_string()),
2954 DataValue::String("High".to_string()),
2955 DataValue::String("John".to_string()),
2956 ]))
2957 .unwrap();
2958
2959 table
2960 .add_row(DataRow::new(vec![
2961 DataValue::Integer(2),
2962 DataValue::String("Complete".to_string()),
2963 DataValue::String("High".to_string()),
2964 DataValue::String("Jane".to_string()),
2965 ]))
2966 .unwrap();
2967
2968 table
2969 .add_row(DataRow::new(vec![
2970 DataValue::Integer(3),
2971 DataValue::String("Pending".to_string()),
2972 DataValue::String("Low".to_string()),
2973 DataValue::String("John".to_string()),
2974 ]))
2975 .unwrap();
2976
2977 table
2978 .add_row(DataRow::new(vec![
2979 DataValue::Integer(4),
2980 DataValue::String("In Progress".to_string()),
2981 DataValue::String("Medium".to_string()),
2982 DataValue::String("Jane".to_string()),
2983 ]))
2984 .unwrap();
2985
2986 let table = Arc::new(table);
2987 let engine = QueryEngine::new();
2988
2989 println!("\n=== Testing Complex Logical Expressions ===");
2990 println!("Table has {} rows", table.row_count());
2991 for i in 0..table.row_count() {
2992 let status = table.get_value(i, 1);
2993 let priority = table.get_value(i, 2);
2994 let assigned = table.get_value(i, 3);
2995 println!(
2996 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2997 );
2998 }
2999
3000 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3002 let result = engine.execute(
3003 table.clone(),
3004 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3005 );
3006 match result {
3007 Ok(view) => {
3008 println!(
3009 "SUCCESS: Found {} rows with complex logic",
3010 view.row_count()
3011 );
3012 assert_eq!(view.row_count(), 2); }
3014 Err(e) => {
3015 panic!("Complex logic query failed: {e}");
3016 }
3017 }
3018
3019 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3021 let result = engine.execute(
3022 table.clone(),
3023 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3024 );
3025 match result {
3026 Ok(view) => {
3027 println!(
3028 "SUCCESS: Found {} rows with NOT complex logic",
3029 view.row_count()
3030 );
3031 assert_eq!(view.row_count(), 2); }
3033 Err(e) => {
3034 panic!("NOT complex logic query failed: {e}");
3035 }
3036 }
3037
3038 println!("\n=== Complex logical expressions test complete! ===");
3039 }
3040
3041 #[test]
3042 fn test_mixed_data_types_and_edge_cases() {
3043 let _ = tracing_subscriber::fmt()
3045 .with_max_level(tracing::Level::DEBUG)
3046 .try_init();
3047
3048 let mut table = DataTable::new("test");
3049 table.add_column(DataColumn::new("id"));
3050 table.add_column(DataColumn::new("value"));
3051 table.add_column(DataColumn::new("nullable_field"));
3052
3053 table
3055 .add_row(DataRow::new(vec![
3056 DataValue::Integer(1),
3057 DataValue::String("123.45".to_string()),
3058 DataValue::String("present".to_string()),
3059 ]))
3060 .unwrap();
3061
3062 table
3063 .add_row(DataRow::new(vec![
3064 DataValue::Integer(2),
3065 DataValue::Float(678.90),
3066 DataValue::Null,
3067 ]))
3068 .unwrap();
3069
3070 table
3071 .add_row(DataRow::new(vec![
3072 DataValue::Integer(3),
3073 DataValue::Boolean(true),
3074 DataValue::String("also present".to_string()),
3075 ]))
3076 .unwrap();
3077
3078 table
3079 .add_row(DataRow::new(vec![
3080 DataValue::Integer(4),
3081 DataValue::String("false".to_string()),
3082 DataValue::Null,
3083 ]))
3084 .unwrap();
3085
3086 let table = Arc::new(table);
3087 let engine = QueryEngine::new();
3088
3089 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3090 println!("Table has {} rows", table.row_count());
3091 for i in 0..table.row_count() {
3092 let value = table.get_value(i, 1);
3093 let nullable = table.get_value(i, 2);
3094 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3095 }
3096
3097 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3099 let result = engine.execute(
3100 table.clone(),
3101 "SELECT * FROM test WHERE value.Contains('true')",
3102 );
3103 match result {
3104 Ok(view) => {
3105 println!(
3106 "SUCCESS: Found {} rows with boolean coercion",
3107 view.row_count()
3108 );
3109 assert_eq!(view.row_count(), 1); }
3111 Err(e) => {
3112 panic!("Boolean coercion query failed: {e}");
3113 }
3114 }
3115
3116 println!("\n--- Test: id IN (1, 3) ---");
3118 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3119 match result {
3120 Ok(view) => {
3121 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3122 assert_eq!(view.row_count(), 2); }
3124 Err(e) => {
3125 panic!("Multiple IN values query failed: {e}");
3126 }
3127 }
3128
3129 println!("\n=== Mixed data types test complete! ===");
3130 }
3131
3132 #[test]
3134 fn test_aggregate_only_single_row() {
3135 let table = create_test_stock_data();
3136 let engine = QueryEngine::new();
3137
3138 let result = engine
3140 .execute(
3141 table.clone(),
3142 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3143 )
3144 .expect("Query should succeed");
3145
3146 assert_eq!(
3147 result.row_count(),
3148 1,
3149 "Aggregate-only query should return exactly 1 row"
3150 );
3151 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3152
3153 let source = result.source();
3155 let row = source.get_row(0).expect("Should have first row");
3156
3157 assert_eq!(row.values[0], DataValue::Integer(5));
3159
3160 assert_eq!(row.values[1], DataValue::Float(99.5));
3162
3163 assert_eq!(row.values[2], DataValue::Float(105.0));
3165
3166 if let DataValue::Float(avg) = &row.values[3] {
3168 assert!(
3169 (avg - 102.4).abs() < 0.01,
3170 "Average should be approximately 102.4, got {}",
3171 avg
3172 );
3173 } else {
3174 panic!("AVG should return a Float value");
3175 }
3176 }
3177
3178 #[test]
3180 fn test_single_aggregate_single_row() {
3181 let table = create_test_stock_data();
3182 let engine = QueryEngine::new();
3183
3184 let result = engine
3185 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3186 .expect("Query should succeed");
3187
3188 assert_eq!(
3189 result.row_count(),
3190 1,
3191 "Single aggregate query should return exactly 1 row"
3192 );
3193 assert_eq!(result.column_count(), 1, "Should have 1 column");
3194
3195 let source = result.source();
3196 let row = source.get_row(0).expect("Should have first row");
3197 assert_eq!(row.values[0], DataValue::Integer(5));
3198 }
3199
3200 #[test]
3202 fn test_aggregate_with_where_single_row() {
3203 let table = create_test_stock_data();
3204 let engine = QueryEngine::new();
3205
3206 let result = engine
3208 .execute(
3209 table.clone(),
3210 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3211 )
3212 .expect("Query should succeed");
3213
3214 assert_eq!(
3215 result.row_count(),
3216 1,
3217 "Filtered aggregate query should return exactly 1 row"
3218 );
3219 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3220
3221 let source = result.source();
3222 let row = source.get_row(0).expect("Should have first row");
3223
3224 assert_eq!(row.values[0], DataValue::Integer(2));
3226 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
3229
3230 #[test]
3231 fn test_not_in_parsing() {
3232 use crate::sql::recursive_parser::Parser;
3233
3234 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3235 println!("\n=== Testing NOT IN parsing ===");
3236 println!("Parsing query: {query}");
3237
3238 let mut parser = Parser::new(query);
3239 match parser.parse() {
3240 Ok(statement) => {
3241 println!("Parsed statement: {statement:#?}");
3242 if let Some(where_clause) = statement.where_clause {
3243 println!("WHERE conditions: {:#?}", where_clause.conditions);
3244 if let Some(first_condition) = where_clause.conditions.first() {
3245 println!("First condition expression: {:#?}", first_condition.expr);
3246 }
3247 }
3248 }
3249 Err(e) => {
3250 panic!("Parse error: {e}");
3251 }
3252 }
3253 }
3254
3255 fn create_test_stock_data() -> Arc<DataTable> {
3257 let mut table = DataTable::new("stock");
3258
3259 table.add_column(DataColumn::new("symbol"));
3260 table.add_column(DataColumn::new("close"));
3261 table.add_column(DataColumn::new("volume"));
3262
3263 let test_data = vec![
3265 ("AAPL", 99.5, 1000),
3266 ("AAPL", 101.2, 1500),
3267 ("AAPL", 103.5, 2000),
3268 ("AAPL", 105.0, 1200),
3269 ("AAPL", 102.8, 1800),
3270 ];
3271
3272 for (symbol, close, volume) in test_data {
3273 table
3274 .add_row(DataRow::new(vec![
3275 DataValue::String(symbol.to_string()),
3276 DataValue::Float(close),
3277 DataValue::Integer(volume),
3278 ]))
3279 .expect("Should add row successfully");
3280 }
3281
3282 Arc::new(table)
3283 }
3284}
3285
3286#[cfg(test)]
3287#[path = "query_engine_tests.rs"]
3288mod query_engine_tests;