1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::recursive_parser::{
27 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
28 TableFunction,
29};
30
31#[derive(Debug, Clone)]
33pub struct ExecutionContext {
34 alias_map: HashMap<String, String>,
37}
38
39impl ExecutionContext {
40 pub fn new() -> Self {
42 Self {
43 alias_map: HashMap::new(),
44 }
45 }
46
47 pub fn register_alias(&mut self, alias: String, table_name: String) {
49 debug!("Registering alias: {} -> {}", alias, table_name);
50 self.alias_map.insert(alias, table_name);
51 }
52
53 pub fn resolve_alias(&self, name: &str) -> String {
56 self.alias_map
57 .get(name)
58 .cloned()
59 .unwrap_or_else(|| name.to_string())
60 }
61
62 pub fn is_alias(&self, name: &str) -> bool {
64 self.alias_map.contains_key(name)
65 }
66
67 pub fn get_aliases(&self) -> HashMap<String, String> {
69 self.alias_map.clone()
70 }
71
72 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
87 if let Some(table_prefix) = &column_ref.table_prefix {
88 let actual_table = self.resolve_alias(table_prefix);
90
91 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
93 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
94 debug!(
95 "Resolved {}.{} -> qualified column '{}' at index {}",
96 table_prefix, column_ref.name, qualified_name, idx
97 );
98 return Ok(idx);
99 }
100
101 if let Some(idx) = table.get_column_index(&column_ref.name) {
103 debug!(
104 "Resolved {}.{} -> unqualified column '{}' at index {}",
105 table_prefix, column_ref.name, column_ref.name, idx
106 );
107 return Ok(idx);
108 }
109
110 Err(anyhow!(
112 "Column '{}' not found. Table '{}' may not support qualified column names",
113 qualified_name,
114 actual_table
115 ))
116 } else {
117 if let Some(idx) = table.get_column_index(&column_ref.name) {
119 debug!(
120 "Resolved unqualified column '{}' at index {}",
121 column_ref.name, idx
122 );
123 return Ok(idx);
124 }
125
126 if column_ref.name.contains('.') {
128 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
129 debug!(
130 "Resolved '{}' as qualified column at index {}",
131 column_ref.name, idx
132 );
133 return Ok(idx);
134 }
135 }
136
137 let suggestion = self.find_similar_column(table, &column_ref.name);
139 match suggestion {
140 Some(similar) => Err(anyhow!(
141 "Column '{}' not found. Did you mean '{}'?",
142 column_ref.name,
143 similar
144 )),
145 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
146 }
147 }
148 }
149
150 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
152 let columns = table.column_names();
153 let mut best_match: Option<(String, usize)> = None;
154
155 for col in columns {
156 let distance = edit_distance(name, &col);
157 if distance <= 2 {
158 match best_match {
160 Some((_, best_dist)) if distance < best_dist => {
161 best_match = Some((col.clone(), distance));
162 }
163 None => {
164 best_match = Some((col.clone(), distance));
165 }
166 _ => {}
167 }
168 }
169 }
170
171 best_match.map(|(name, _)| name)
172 }
173}
174
175impl Default for ExecutionContext {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181fn edit_distance(a: &str, b: &str) -> usize {
183 let len_a = a.chars().count();
184 let len_b = b.chars().count();
185
186 if len_a == 0 {
187 return len_b;
188 }
189 if len_b == 0 {
190 return len_a;
191 }
192
193 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
194
195 for i in 0..=len_a {
196 matrix[i][0] = i;
197 }
198 for j in 0..=len_b {
199 matrix[0][j] = j;
200 }
201
202 let a_chars: Vec<char> = a.chars().collect();
203 let b_chars: Vec<char> = b.chars().collect();
204
205 for i in 1..=len_a {
206 for j in 1..=len_b {
207 let cost = if a_chars[i - 1] == b_chars[j - 1] {
208 0
209 } else {
210 1
211 };
212 matrix[i][j] = min(
213 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
214 matrix[i - 1][j - 1] + cost,
215 );
216 }
217 }
218
219 matrix[len_a][len_b]
220}
221
222#[derive(Clone)]
224pub struct QueryEngine {
225 case_insensitive: bool,
226 date_notation: String,
227 _behavior_config: Option<BehaviorConfig>,
228}
229
230impl Default for QueryEngine {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl QueryEngine {
237 #[must_use]
238 pub fn new() -> Self {
239 Self {
240 case_insensitive: false,
241 date_notation: get_date_notation(),
242 _behavior_config: None,
243 }
244 }
245
246 #[must_use]
247 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
248 let case_insensitive = config.case_insensitive_default;
249 let date_notation = get_date_notation();
251 Self {
252 case_insensitive,
253 date_notation,
254 _behavior_config: Some(config),
255 }
256 }
257
258 #[must_use]
259 pub fn with_date_notation(_date_notation: String) -> Self {
260 Self {
261 case_insensitive: false,
262 date_notation: get_date_notation(), _behavior_config: None,
264 }
265 }
266
267 #[must_use]
268 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
269 Self {
270 case_insensitive,
271 date_notation: get_date_notation(),
272 _behavior_config: None,
273 }
274 }
275
276 #[must_use]
277 pub fn with_case_insensitive_and_date_notation(
278 case_insensitive: bool,
279 _date_notation: String, ) -> Self {
281 Self {
282 case_insensitive,
283 date_notation: get_date_notation(), _behavior_config: None,
285 }
286 }
287
288 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
290 let columns = table.column_names();
291 let mut best_match: Option<(String, usize)> = None;
292
293 for col in columns {
294 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
295 let max_distance = if name.len() > 10 { 3 } else { 2 };
298 if distance <= max_distance {
299 match &best_match {
300 None => best_match = Some((col, distance)),
301 Some((_, best_dist)) if distance < *best_dist => {
302 best_match = Some((col, distance));
303 }
304 _ => {}
305 }
306 }
307 }
308
309 best_match.map(|(name, _)| name)
310 }
311
312 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
314 let len1 = s1.len();
315 let len2 = s2.len();
316 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
317
318 for i in 0..=len1 {
319 matrix[i][0] = i;
320 }
321 for j in 0..=len2 {
322 matrix[0][j] = j;
323 }
324
325 for (i, c1) in s1.chars().enumerate() {
326 for (j, c2) in s2.chars().enumerate() {
327 let cost = usize::from(c1 != c2);
328 matrix[i + 1][j + 1] = std::cmp::min(
329 matrix[i][j + 1] + 1, std::cmp::min(
331 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
334 );
335 }
336 }
337
338 matrix[len1][len2]
339 }
340
341 fn contains_unnest(expr: &SqlExpression) -> bool {
343 match expr {
344 SqlExpression::Unnest { .. } => true,
346 SqlExpression::FunctionCall { name, args, .. } => {
347 if name.to_uppercase() == "UNNEST" {
348 return true;
349 }
350 args.iter().any(Self::contains_unnest)
352 }
353 SqlExpression::BinaryOp { left, right, .. } => {
354 Self::contains_unnest(left) || Self::contains_unnest(right)
355 }
356 SqlExpression::Not { expr } => Self::contains_unnest(expr),
357 SqlExpression::CaseExpression {
358 when_branches,
359 else_branch,
360 } => {
361 when_branches.iter().any(|branch| {
362 Self::contains_unnest(&branch.condition)
363 || Self::contains_unnest(&branch.result)
364 }) || else_branch
365 .as_ref()
366 .map_or(false, |e| Self::contains_unnest(e))
367 }
368 SqlExpression::SimpleCaseExpression {
369 expr,
370 when_branches,
371 else_branch,
372 } => {
373 Self::contains_unnest(expr)
374 || when_branches.iter().any(|branch| {
375 Self::contains_unnest(&branch.value)
376 || Self::contains_unnest(&branch.result)
377 })
378 || else_branch
379 .as_ref()
380 .map_or(false, |e| Self::contains_unnest(e))
381 }
382 SqlExpression::InList { expr, values } => {
383 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
384 }
385 SqlExpression::NotInList { expr, values } => {
386 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
387 }
388 SqlExpression::Between { expr, lower, upper } => {
389 Self::contains_unnest(expr)
390 || Self::contains_unnest(lower)
391 || Self::contains_unnest(upper)
392 }
393 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
394 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
397 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::ChainedMethodCall { base, args, .. } => {
399 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
400 }
401 _ => false,
402 }
403 }
404
405 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407 let (view, _plan) = self.execute_with_plan(table, sql)?;
408 Ok(view)
409 }
410
411 pub fn execute_with_temp_tables(
413 &self,
414 table: Arc<DataTable>,
415 sql: &str,
416 temp_tables: Option<&TempTableRegistry>,
417 ) -> Result<DataView> {
418 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419 Ok(view)
420 }
421
422 pub fn execute_statement(
424 &self,
425 table: Arc<DataTable>,
426 statement: SelectStatement,
427 ) -> Result<DataView> {
428 self.execute_statement_with_temp_tables(table, statement, None)
429 }
430
431 pub fn execute_statement_with_temp_tables(
433 &self,
434 table: Arc<DataTable>,
435 statement: SelectStatement,
436 temp_tables: Option<&TempTableRegistry>,
437 ) -> Result<DataView> {
438 let mut cte_context = HashMap::new();
440
441 if let Some(temp_registry) = temp_tables {
443 for table_name in temp_registry.list_tables() {
444 if let Some(temp_table) = temp_registry.get(&table_name) {
445 debug!("Adding temp table {} to CTE context", table_name);
446 let view = DataView::new(temp_table);
447 cte_context.insert(table_name, Arc::new(view));
448 }
449 }
450 }
451
452 for cte in &statement.ctes {
453 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454 let cte_result = match &cte.cte_type {
456 CTEType::Standard(query) => {
457 let view = self.build_view_with_context(
459 table.clone(),
460 query.clone(),
461 &mut cte_context,
462 )?;
463
464 let mut materialized = self.materialize_view(view)?;
466
467 for column in materialized.columns_mut() {
469 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470 column.source_table = Some(cte.name.clone());
471 }
472
473 DataView::new(Arc::new(materialized))
474 }
475 CTEType::Web(web_spec) => {
476 use crate::web::http_fetcher::WebDataFetcher;
478
479 let fetcher = WebDataFetcher::new()?;
480 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483 for column in data_table.columns_mut() {
485 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486 column.source_table = Some(cte.name.clone());
487 }
488
489 DataView::new(Arc::new(data_table))
491 }
492 };
493 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495 debug!(
496 "QueryEngine: CTE '{}' pre-processed, stored in context",
497 cte.name
498 );
499 }
500
501 let mut subquery_executor =
503 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506 self.build_view_with_context(table, processed_statement, &mut cte_context)
508 }
509
510 pub fn execute_statement_with_cte_context(
512 &self,
513 table: Arc<DataTable>,
514 statement: SelectStatement,
515 cte_context: &HashMap<String, Arc<DataView>>,
516 ) -> Result<DataView> {
517 let mut local_context = cte_context.clone();
519
520 for cte in &statement.ctes {
522 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523 let cte_result = match &cte.cte_type {
524 CTEType::Standard(query) => {
525 let view = self.build_view_with_context(
526 table.clone(),
527 query.clone(),
528 &mut local_context,
529 )?;
530
531 let mut materialized = self.materialize_view(view)?;
533
534 for column in materialized.columns_mut() {
536 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537 column.source_table = Some(cte.name.clone());
538 }
539
540 DataView::new(Arc::new(materialized))
541 }
542 CTEType::Web(web_spec) => {
543 use crate::web::http_fetcher::WebDataFetcher;
545
546 let fetcher = WebDataFetcher::new()?;
547 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550 for column in data_table.columns_mut() {
552 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553 column.source_table = Some(cte.name.clone());
554 }
555
556 DataView::new(Arc::new(data_table))
558 }
559 };
560 local_context.insert(cte.name.clone(), Arc::new(cte_result));
561 }
562
563 let mut subquery_executor =
565 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568 self.build_view_with_context(table, processed_statement, &mut local_context)
570 }
571
572 pub fn execute_with_plan(
574 &self,
575 table: Arc<DataTable>,
576 sql: &str,
577 ) -> Result<(DataView, ExecutionPlan)> {
578 self.execute_with_plan_and_temp_tables(table, sql, None)
579 }
580
581 pub fn execute_with_plan_and_temp_tables(
583 &self,
584 table: Arc<DataTable>,
585 sql: &str,
586 temp_tables: Option<&TempTableRegistry>,
587 ) -> Result<(DataView, ExecutionPlan)> {
588 let mut plan_builder = ExecutionPlanBuilder::new();
589 let start_time = Instant::now();
590
591 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593 plan_builder.add_detail(format!("Query: {}", sql));
594 let mut parser = Parser::new(sql);
595 let statement = parser
596 .parse()
597 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598 plan_builder.add_detail(format!("Parsed successfully"));
599 if let Some(from) = &statement.from_table {
600 plan_builder.add_detail(format!("FROM: {}", from));
601 }
602 if statement.where_clause.is_some() {
603 plan_builder.add_detail("WHERE clause present".to_string());
604 }
605 plan_builder.end_step();
606
607 let mut cte_context = HashMap::new();
609
610 if let Some(temp_registry) = temp_tables {
612 for table_name in temp_registry.list_tables() {
613 if let Some(temp_table) = temp_registry.get(&table_name) {
614 debug!("Adding temp table {} to CTE context", table_name);
615 let view = DataView::new(temp_table);
616 cte_context.insert(table_name, Arc::new(view));
617 }
618 }
619 }
620
621 if !statement.ctes.is_empty() {
622 plan_builder.begin_step(
623 StepType::CTE,
624 format!("Process {} CTEs", statement.ctes.len()),
625 );
626
627 for cte in &statement.ctes {
628 let cte_start = Instant::now();
629 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631 let cte_result = match &cte.cte_type {
632 CTEType::Standard(query) => {
633 if let Some(from) = &query.from_table {
635 plan_builder.add_detail(format!("Source: {}", from));
636 }
637 if query.where_clause.is_some() {
638 plan_builder.add_detail("Has WHERE clause".to_string());
639 }
640 if query.group_by.is_some() {
641 plan_builder.add_detail("Has GROUP BY".to_string());
642 }
643
644 debug!(
645 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646 cte.name,
647 cte_context.keys().collect::<Vec<_>>()
648 );
649
650 let mut subquery_executor = SubqueryExecutor::with_cte_context(
653 self.clone(),
654 table.clone(),
655 cte_context.clone(),
656 );
657 let processed_query = subquery_executor.execute_subqueries(query)?;
658
659 let view = self.build_view_with_context(
660 table.clone(),
661 processed_query,
662 &mut cte_context,
663 )?;
664
665 let mut materialized = self.materialize_view(view)?;
667
668 for column in materialized.columns_mut() {
670 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671 column.source_table = Some(cte.name.clone());
672 }
673
674 DataView::new(Arc::new(materialized))
675 }
676 CTEType::Web(web_spec) => {
677 plan_builder.add_detail(format!("URL: {}", web_spec.url));
678 if let Some(format) = &web_spec.format {
679 plan_builder.add_detail(format!("Format: {:?}", format));
680 }
681 if let Some(cache) = web_spec.cache_seconds {
682 plan_builder.add_detail(format!("Cache: {} seconds", cache));
683 }
684
685 use crate::web::http_fetcher::WebDataFetcher;
687
688 let fetcher = WebDataFetcher::new()?;
689 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692 for column in data_table.columns_mut() {
694 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695 column.source_table = Some(cte.name.clone());
696 }
697
698 DataView::new(Arc::new(data_table))
700 }
701 };
702
703 plan_builder.set_rows_out(cte_result.row_count());
705 plan_builder.add_detail(format!(
706 "Result: {} rows, {} columns",
707 cte_result.row_count(),
708 cte_result.column_count()
709 ));
710 plan_builder.add_detail(format!(
711 "Execution time: {:.3}ms",
712 cte_start.elapsed().as_secs_f64() * 1000.0
713 ));
714
715 debug!(
716 "QueryEngine: Storing CTE '{}' in context with {} rows",
717 cte.name,
718 cte_result.row_count()
719 );
720 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721 plan_builder.end_step();
722 }
723
724 plan_builder.add_detail(format!(
725 "All {} CTEs cached in context",
726 statement.ctes.len()
727 ));
728 plan_builder.end_step();
729 }
730
731 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733 let mut subquery_executor =
734 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738 format!("{:?}", w).contains("Subquery")
740 });
741
742 if has_subqueries {
743 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744 }
745
746 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748 if has_subqueries {
749 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750 } else {
751 plan_builder.add_detail("No subqueries to process".to_string());
752 }
753
754 plan_builder.end_step();
755 let result = self.build_view_with_context_and_plan(
756 table,
757 processed_statement,
758 &mut cte_context,
759 &mut plan_builder,
760 )?;
761
762 let total_duration = start_time.elapsed();
763 info!(
764 "Query execution complete: total={:?}, rows={}",
765 total_duration,
766 result.row_count()
767 );
768
769 let plan = plan_builder.build();
770 Ok((result, plan))
771 }
772
773 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775 let mut cte_context = HashMap::new();
776 self.build_view_with_context(table, statement, &mut cte_context)
777 }
778
779 fn build_view_with_context(
781 &self,
782 table: Arc<DataTable>,
783 statement: SelectStatement,
784 cte_context: &mut HashMap<String, Arc<DataView>>,
785 ) -> Result<DataView> {
786 let mut dummy_plan = ExecutionPlanBuilder::new();
787 let mut exec_context = ExecutionContext::new();
788 self.build_view_with_context_and_plan_and_exec(
789 table,
790 statement,
791 cte_context,
792 &mut dummy_plan,
793 &mut exec_context,
794 )
795 }
796
797 fn build_view_with_context_and_plan(
799 &self,
800 table: Arc<DataTable>,
801 statement: SelectStatement,
802 cte_context: &mut HashMap<String, Arc<DataView>>,
803 plan: &mut ExecutionPlanBuilder,
804 ) -> Result<DataView> {
805 let mut exec_context = ExecutionContext::new();
806 self.build_view_with_context_and_plan_and_exec(
807 table,
808 statement,
809 cte_context,
810 plan,
811 &mut exec_context,
812 )
813 }
814
815 fn build_view_with_context_and_plan_and_exec(
817 &self,
818 table: Arc<DataTable>,
819 statement: SelectStatement,
820 cte_context: &mut HashMap<String, Arc<DataView>>,
821 plan: &mut ExecutionPlanBuilder,
822 exec_context: &mut ExecutionContext,
823 ) -> Result<DataView> {
824 for cte in &statement.ctes {
826 if cte_context.contains_key(&cte.name) {
828 debug!(
829 "QueryEngine: CTE '{}' already in context, skipping",
830 cte.name
831 );
832 continue;
833 }
834
835 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836 debug!(
837 "QueryEngine: Available CTEs for '{}': {:?}",
838 cte.name,
839 cte_context.keys().collect::<Vec<_>>()
840 );
841
842 let cte_result = match &cte.cte_type {
844 CTEType::Standard(query) => {
845 let view =
846 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848 let mut materialized = self.materialize_view(view)?;
850
851 for column in materialized.columns_mut() {
853 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854 column.source_table = Some(cte.name.clone());
855 }
856
857 DataView::new(Arc::new(materialized))
858 }
859 CTEType::Web(_web_spec) => {
860 return Err(anyhow!(
862 "Web CTEs should be processed in execute_select method"
863 ));
864 }
865 };
866
867 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869 debug!(
870 "QueryEngine: CTE '{}' processed, stored in context",
871 cte.name
872 );
873 }
874
875 let source_table = if let Some(ref table_func) = statement.from_function {
877 debug!("QueryEngine: Processing table function...");
879 match table_func {
880 TableFunction::Generator { name, args } => {
881 use crate::sql::generators::GeneratorRegistry;
883
884 let registry = GeneratorRegistry::new();
886
887 if let Some(generator) = registry.get(name) {
888 let mut evaluator = ArithmeticEvaluator::with_date_notation(
890 &table,
891 self.date_notation.clone(),
892 );
893 let dummy_row = 0;
894
895 let mut evaluated_args = Vec::new();
896 for arg in args {
897 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898 }
899
900 generator.generate(evaluated_args)?
902 } else {
903 return Err(anyhow!("Unknown generator function: {}", name));
904 }
905 }
906 }
907 } else if let Some(ref subquery) = statement.from_subquery {
908 debug!("QueryEngine: Processing FROM subquery...");
910 let subquery_result =
911 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913 let materialized = self.materialize_view(subquery_result)?;
916 Arc::new(materialized)
917 } else if let Some(ref table_name) = statement.from_table {
918 if let Some(cte_view) = cte_context.get(table_name) {
920 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921 let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924 if let Some(ref alias) = statement.from_alias {
926 debug!(
927 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928 alias, table_name
929 );
930 for column in materialized.columns_mut() {
931 if let Some(ref qualified_name) = column.qualified_name {
933 if qualified_name.starts_with(&format!("{}.", table_name)) {
934 column.qualified_name =
935 Some(qualified_name.replace(
936 &format!("{}.", table_name),
937 &format!("{}.", alias),
938 ));
939 }
940 }
941 if column.source_table.as_ref() == Some(table_name) {
943 column.source_table = Some(alias.clone());
944 }
945 }
946 }
947
948 Arc::new(materialized)
949 } else {
950 table.clone()
952 }
953 } else {
954 table.clone()
956 };
957
958 if let Some(ref alias) = statement.from_alias {
960 if let Some(ref table_name) = statement.from_table {
961 exec_context.register_alias(alias.clone(), table_name.clone());
962 }
963 }
964
965 let final_table = if !statement.joins.is_empty() {
967 plan.begin_step(
968 StepType::Join,
969 format!("Process {} JOINs", statement.joins.len()),
970 );
971 plan.set_rows_in(source_table.row_count());
972
973 let join_executor = HashJoinExecutor::new(self.case_insensitive);
974 let mut current_table = source_table;
975
976 for (idx, join_clause) in statement.joins.iter().enumerate() {
977 let join_start = Instant::now();
978 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981 plan.add_detail(format!(
982 "Executing {:?} JOIN on {} condition(s)",
983 join_clause.join_type,
984 join_clause.condition.conditions.len()
985 ));
986
987 let right_table = match &join_clause.table {
989 TableSource::Table(name) => {
990 if let Some(cte_view) = cte_context.get(name) {
992 let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994 if let Some(ref alias) = join_clause.alias {
996 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997 for column in materialized.columns_mut() {
998 if let Some(ref qualified_name) = column.qualified_name {
1000 if qualified_name.starts_with(&format!("{}.", name)) {
1001 column.qualified_name = Some(qualified_name.replace(
1002 &format!("{}.", name),
1003 &format!("{}.", alias),
1004 ));
1005 }
1006 }
1007 if column.source_table.as_ref() == Some(name) {
1009 column.source_table = Some(alias.clone());
1010 }
1011 }
1012 }
1013
1014 Arc::new(materialized)
1015 } else {
1016 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019 }
1020 }
1021 TableSource::DerivedTable { query, alias: _ } => {
1022 let subquery_result = self.build_view_with_context(
1024 table.clone(),
1025 *query.clone(),
1026 cte_context,
1027 )?;
1028 let materialized = self.materialize_view(subquery_result)?;
1029 Arc::new(materialized)
1030 }
1031 };
1032
1033 let joined = join_executor.execute_join(
1035 current_table.clone(),
1036 join_clause,
1037 right_table.clone(),
1038 )?;
1039
1040 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041 plan.set_rows_out(joined.row_count());
1042 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043 plan.add_detail(format!(
1044 "Join time: {:.3}ms",
1045 join_start.elapsed().as_secs_f64() * 1000.0
1046 ));
1047 plan.end_step();
1048
1049 current_table = Arc::new(joined);
1050 }
1051
1052 plan.set_rows_out(current_table.row_count());
1053 plan.add_detail(format!(
1054 "Final result after all joins: {} rows",
1055 current_table.row_count()
1056 ));
1057 plan.end_step();
1058 current_table
1059 } else {
1060 source_table
1061 };
1062
1063 self.build_view_internal_with_plan_and_exec(
1065 final_table,
1066 statement,
1067 plan,
1068 Some(exec_context),
1069 )
1070 }
1071
1072 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074 let source = view.source();
1075 let mut result_table = DataTable::new("derived");
1076
1077 let visible_cols = view.visible_column_indices().to_vec();
1079
1080 for col_idx in &visible_cols {
1082 let col = &source.columns[*col_idx];
1083 let new_col = DataColumn {
1084 name: col.name.clone(),
1085 data_type: col.data_type.clone(),
1086 nullable: col.nullable,
1087 unique_values: col.unique_values,
1088 null_count: col.null_count,
1089 metadata: col.metadata.clone(),
1090 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1093 result_table.add_column(new_col);
1094 }
1095
1096 for row_idx in view.visible_row_indices() {
1098 let source_row = &source.rows[*row_idx];
1099 let mut new_row = DataRow { values: Vec::new() };
1100
1101 for col_idx in &visible_cols {
1102 new_row.values.push(source_row.values[*col_idx].clone());
1103 }
1104
1105 result_table.add_row(new_row);
1106 }
1107
1108 Ok(result_table)
1109 }
1110
1111 fn build_view_internal(
1112 &self,
1113 table: Arc<DataTable>,
1114 statement: SelectStatement,
1115 ) -> Result<DataView> {
1116 let mut dummy_plan = ExecutionPlanBuilder::new();
1117 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118 }
1119
1120 fn build_view_internal_with_plan(
1121 &self,
1122 table: Arc<DataTable>,
1123 statement: SelectStatement,
1124 plan: &mut ExecutionPlanBuilder,
1125 ) -> Result<DataView> {
1126 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127 }
1128
1129 fn build_view_internal_with_plan_and_exec(
1130 &self,
1131 table: Arc<DataTable>,
1132 statement: SelectStatement,
1133 plan: &mut ExecutionPlanBuilder,
1134 exec_context: Option<&ExecutionContext>,
1135 ) -> Result<DataView> {
1136 debug!(
1137 "QueryEngine::build_view - select_items: {:?}",
1138 statement.select_items
1139 );
1140 debug!(
1141 "QueryEngine::build_view - where_clause: {:?}",
1142 statement.where_clause
1143 );
1144
1145 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148 if let Some(where_clause) = &statement.where_clause {
1150 let total_rows = table.row_count();
1151 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155 plan.set_rows_in(total_rows);
1156 plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158 for condition in &where_clause.conditions {
1160 plan.add_detail(format!("Condition: {:?}", condition.expr));
1161 }
1162
1163 let filter_start = Instant::now();
1164 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167 let mut evaluator = if let Some(exec_ctx) = exec_context {
1169 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1171 } else {
1172 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1173 };
1174
1175 let mut filtered_rows = Vec::new();
1177 for row_idx in visible_rows {
1178 if row_idx < 3 {
1180 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1181 }
1182
1183 match evaluator.evaluate(where_clause, row_idx) {
1184 Ok(result) => {
1185 if row_idx < 3 {
1186 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1187 }
1188 if result {
1189 filtered_rows.push(row_idx);
1190 }
1191 }
1192 Err(e) => {
1193 if row_idx < 3 {
1194 debug!(
1195 "QueryEngine: WHERE evaluation error for row {}: {}",
1196 row_idx, e
1197 );
1198 }
1199 return Err(e);
1201 }
1202 }
1203 }
1204
1205 let (compilations, cache_hits) = eval_context.get_stats();
1207 if compilations > 0 || cache_hits > 0 {
1208 debug!(
1209 "LIKE pattern cache: {} compilations, {} cache hits",
1210 compilations, cache_hits
1211 );
1212 }
1213 visible_rows = filtered_rows;
1214 let filter_duration = filter_start.elapsed();
1215 info!(
1216 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1217 total_rows,
1218 visible_rows.len(),
1219 filter_duration
1220 );
1221
1222 plan.set_rows_out(visible_rows.len());
1223 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1224 plan.add_detail(format!(
1225 "Filter time: {:.3}ms",
1226 filter_duration.as_secs_f64() * 1000.0
1227 ));
1228 plan.end_step();
1229 }
1230
1231 let mut view = DataView::new(table.clone());
1233 view = view.with_rows(visible_rows);
1234
1235 if let Some(group_by_exprs) = &statement.group_by {
1237 if !group_by_exprs.is_empty() {
1238 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1239
1240 plan.begin_step(
1241 StepType::GroupBy,
1242 format!("GROUP BY {} expressions", group_by_exprs.len()),
1243 );
1244 plan.set_rows_in(view.row_count());
1245 plan.add_detail(format!("Input: {} rows", view.row_count()));
1246 for expr in group_by_exprs {
1247 plan.add_detail(format!("Group by: {:?}", expr));
1248 }
1249
1250 let group_start = Instant::now();
1251 view = self.apply_group_by(
1252 view,
1253 group_by_exprs,
1254 &statement.select_items,
1255 statement.having.as_ref(),
1256 plan,
1257 )?;
1258
1259 plan.set_rows_out(view.row_count());
1260 plan.add_detail(format!("Output: {} groups", view.row_count()));
1261 plan.add_detail(format!(
1262 "Overall time: {:.3}ms",
1263 group_start.elapsed().as_secs_f64() * 1000.0
1264 ));
1265 plan.end_step();
1266 }
1267 } else {
1268 if !statement.select_items.is_empty() {
1270 let has_non_star_items = statement
1272 .select_items
1273 .iter()
1274 .any(|item| !matches!(item, SelectItem::Star { .. }));
1275
1276 if has_non_star_items || statement.select_items.len() > 1 {
1280 view = self.apply_select_items(
1281 view,
1282 &statement.select_items,
1283 &statement,
1284 exec_context,
1285 )?;
1286 }
1287 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1289 debug!("QueryEngine: Using legacy columns path");
1290 let source_table = view.source();
1293 let column_indices =
1294 self.resolve_column_indices(source_table, &statement.columns)?;
1295 view = view.with_columns(column_indices);
1296 }
1297 }
1298
1299 if statement.distinct {
1301 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1302 plan.set_rows_in(view.row_count());
1303 plan.add_detail(format!("Input: {} rows", view.row_count()));
1304
1305 let distinct_start = Instant::now();
1306 view = self.apply_distinct(view)?;
1307
1308 plan.set_rows_out(view.row_count());
1309 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1310 plan.add_detail(format!(
1311 "Distinct time: {:.3}ms",
1312 distinct_start.elapsed().as_secs_f64() * 1000.0
1313 ));
1314 plan.end_step();
1315 }
1316
1317 if let Some(order_by_columns) = &statement.order_by {
1319 if !order_by_columns.is_empty() {
1320 plan.begin_step(
1321 StepType::Sort,
1322 format!("ORDER BY {} columns", order_by_columns.len()),
1323 );
1324 plan.set_rows_in(view.row_count());
1325 for col in order_by_columns {
1326 plan.add_detail(format!("{} {:?}", col.column, col.direction));
1327 }
1328
1329 let sort_start = Instant::now();
1330 view =
1331 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1332
1333 plan.add_detail(format!(
1334 "Sort time: {:.3}ms",
1335 sort_start.elapsed().as_secs_f64() * 1000.0
1336 ));
1337 plan.end_step();
1338 }
1339 }
1340
1341 if let Some(limit) = statement.limit {
1343 let offset = statement.offset.unwrap_or(0);
1344 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1345 plan.set_rows_in(view.row_count());
1346 if offset > 0 {
1347 plan.add_detail(format!("OFFSET: {}", offset));
1348 }
1349 view = view.with_limit(limit, offset);
1350 plan.set_rows_out(view.row_count());
1351 plan.add_detail(format!("Output: {} rows", view.row_count()));
1352 plan.end_step();
1353 }
1354
1355 if !statement.set_operations.is_empty() {
1357 plan.begin_step(
1358 StepType::SetOperation,
1359 format!("Process {} set operations", statement.set_operations.len()),
1360 );
1361 plan.set_rows_in(view.row_count());
1362
1363 let mut combined_table = self.materialize_view(view)?;
1365 let first_columns = combined_table.column_names();
1366 let first_column_count = first_columns.len();
1367
1368 let mut needs_deduplication = false;
1370
1371 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1373 let op_start = Instant::now();
1374 plan.begin_step(
1375 StepType::SetOperation,
1376 format!("{:?} operation #{}", operation, idx + 1),
1377 );
1378
1379 let next_view = if let Some(exec_ctx) = exec_context {
1382 self.build_view_internal_with_plan_and_exec(
1383 table.clone(),
1384 *next_statement.clone(),
1385 plan,
1386 Some(exec_ctx),
1387 )?
1388 } else {
1389 self.build_view_internal_with_plan(
1390 table.clone(),
1391 *next_statement.clone(),
1392 plan,
1393 )?
1394 };
1395
1396 let next_table = self.materialize_view(next_view)?;
1398 let next_columns = next_table.column_names();
1399 let next_column_count = next_columns.len();
1400
1401 if first_column_count != next_column_count {
1403 return Err(anyhow!(
1404 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1405 first_column_count,
1406 idx + 2,
1407 next_column_count
1408 ));
1409 }
1410
1411 for (col_idx, (first_col, next_col)) in
1413 first_columns.iter().zip(next_columns.iter()).enumerate()
1414 {
1415 if !first_col.eq_ignore_ascii_case(next_col) {
1416 debug!(
1417 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1418 col_idx + 1,
1419 first_col,
1420 next_col
1421 );
1422 }
1423 }
1424
1425 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1426 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1427
1428 match operation {
1430 SetOperation::UnionAll => {
1431 for row in next_table.rows.iter() {
1433 combined_table.add_row(row.clone());
1434 }
1435 plan.add_detail(format!(
1436 "Result: {} rows (no deduplication)",
1437 combined_table.row_count()
1438 ));
1439 }
1440 SetOperation::Union => {
1441 for row in next_table.rows.iter() {
1443 combined_table.add_row(row.clone());
1444 }
1445 needs_deduplication = true;
1446 plan.add_detail(format!(
1447 "Combined: {} rows (deduplication pending)",
1448 combined_table.row_count()
1449 ));
1450 }
1451 SetOperation::Intersect => {
1452 return Err(anyhow!("INTERSECT is not yet implemented"));
1455 }
1456 SetOperation::Except => {
1457 return Err(anyhow!("EXCEPT is not yet implemented"));
1460 }
1461 }
1462
1463 plan.add_detail(format!(
1464 "Operation time: {:.3}ms",
1465 op_start.elapsed().as_secs_f64() * 1000.0
1466 ));
1467 plan.set_rows_out(combined_table.row_count());
1468 plan.end_step();
1469 }
1470
1471 plan.set_rows_out(combined_table.row_count());
1472 plan.add_detail(format!(
1473 "Combined result: {} rows after {} operations",
1474 combined_table.row_count(),
1475 statement.set_operations.len()
1476 ));
1477 plan.end_step();
1478
1479 view = DataView::new(Arc::new(combined_table));
1481
1482 if needs_deduplication {
1484 plan.begin_step(
1485 StepType::Distinct,
1486 "UNION deduplication - remove duplicate rows".to_string(),
1487 );
1488 plan.set_rows_in(view.row_count());
1489 plan.add_detail(format!("Input: {} rows", view.row_count()));
1490
1491 let distinct_start = Instant::now();
1492 view = self.apply_distinct(view)?;
1493
1494 plan.set_rows_out(view.row_count());
1495 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1496 plan.add_detail(format!(
1497 "Deduplication time: {:.3}ms",
1498 distinct_start.elapsed().as_secs_f64() * 1000.0
1499 ));
1500 plan.end_step();
1501 }
1502 }
1503
1504 Ok(view)
1505 }
1506
1507 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1509 let mut indices = Vec::new();
1510 let table_columns = table.column_names();
1511
1512 for col_name in columns {
1513 let index = table_columns
1514 .iter()
1515 .position(|c| c.eq_ignore_ascii_case(col_name))
1516 .ok_or_else(|| {
1517 let suggestion = self.find_similar_column(table, col_name);
1518 match suggestion {
1519 Some(similar) => anyhow::anyhow!(
1520 "Column '{}' not found. Did you mean '{}'?",
1521 col_name,
1522 similar
1523 ),
1524 None => anyhow::anyhow!("Column '{}' not found", col_name),
1525 }
1526 })?;
1527 indices.push(index);
1528 }
1529
1530 Ok(indices)
1531 }
1532
1533 fn apply_select_items(
1535 &self,
1536 view: DataView,
1537 select_items: &[SelectItem],
1538 _statement: &SelectStatement,
1539 exec_context: Option<&ExecutionContext>,
1540 ) -> Result<DataView> {
1541 debug!(
1542 "QueryEngine::apply_select_items - items: {:?}",
1543 select_items
1544 );
1545 debug!(
1546 "QueryEngine::apply_select_items - input view has {} rows",
1547 view.row_count()
1548 );
1549
1550 let has_unnest = select_items.iter().any(|item| match item {
1552 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1553 _ => false,
1554 });
1555
1556 if has_unnest {
1557 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1558 return self.apply_select_with_row_expansion(view, select_items);
1559 }
1560
1561 let has_aggregates = select_items.iter().any(|item| match item {
1565 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1566 SelectItem::Column { .. } => false,
1567 SelectItem::Star { .. } => false,
1568 });
1569
1570 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1571 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1572 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, });
1575
1576 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1577 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1580 return self.apply_aggregate_select(view, select_items);
1581 }
1582
1583 let has_computed_expressions = select_items
1585 .iter()
1586 .any(|item| matches!(item, SelectItem::Expression { .. }));
1587
1588 debug!(
1589 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1590 has_computed_expressions
1591 );
1592
1593 if !has_computed_expressions {
1594 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1596 return Ok(view.with_columns(column_indices));
1597 }
1598
1599 let source_table = view.source();
1604 let visible_rows = view.visible_row_indices();
1605
1606 let mut computed_table = DataTable::new("query_result");
1609
1610 let mut expanded_items = Vec::new();
1612 for item in select_items {
1613 match item {
1614 SelectItem::Star { .. } => {
1615 for col_name in source_table.column_names() {
1617 expanded_items.push(SelectItem::Column {
1618 column: ColumnRef::unquoted(col_name.to_string()),
1619 leading_comments: vec![],
1620 trailing_comment: None,
1621 });
1622 }
1623 }
1624 _ => expanded_items.push(item.clone()),
1625 }
1626 }
1627
1628 let mut column_name_counts: std::collections::HashMap<String, usize> =
1630 std::collections::HashMap::new();
1631
1632 for item in &expanded_items {
1633 let base_name = match item {
1634 SelectItem::Column {
1635 column: col_ref, ..
1636 } => col_ref.name.clone(),
1637 SelectItem::Expression { alias, .. } => alias.clone(),
1638 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1639 };
1640
1641 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1643 let column_name = if *count == 0 {
1644 base_name.clone()
1646 } else {
1647 format!("{base_name}_{count}")
1649 };
1650 *count += 1;
1651
1652 computed_table.add_column(DataColumn::new(&column_name));
1653 }
1654
1655 let mut evaluator =
1657 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1658
1659 if let Some(exec_ctx) = exec_context {
1661 let aliases = exec_ctx.get_aliases();
1662 if !aliases.is_empty() {
1663 debug!(
1664 "Applying {} aliases to evaluator: {:?}",
1665 aliases.len(),
1666 aliases
1667 );
1668 evaluator = evaluator.with_table_aliases(aliases);
1669 }
1670 }
1671
1672 for &row_idx in visible_rows {
1673 let mut row_values = Vec::new();
1674
1675 for item in &expanded_items {
1676 let value = match item {
1677 SelectItem::Column {
1678 column: col_ref, ..
1679 } => {
1680 match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1682 Ok(val) => val,
1683 Err(e) => {
1684 return Err(anyhow!(
1685 "Failed to evaluate column {}: {}",
1686 col_ref.to_sql(),
1687 e
1688 ));
1689 }
1690 }
1691 }
1692 SelectItem::Expression { expr, .. } => {
1693 evaluator.evaluate(&expr, row_idx)?
1695 }
1696 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1697 };
1698 row_values.push(value);
1699 }
1700
1701 computed_table
1702 .add_row(DataRow::new(row_values))
1703 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1704 }
1705
1706 Ok(DataView::new(Arc::new(computed_table)))
1709 }
1710
1711 fn apply_select_with_row_expansion(
1713 &self,
1714 view: DataView,
1715 select_items: &[SelectItem],
1716 ) -> Result<DataView> {
1717 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1718
1719 let source_table = view.source();
1720 let visible_rows = view.visible_row_indices();
1721 let expander_registry = RowExpanderRegistry::new();
1722
1723 let mut result_table = DataTable::new("unnest_result");
1725
1726 let mut expanded_items = Vec::new();
1728 for item in select_items {
1729 match item {
1730 SelectItem::Star { .. } => {
1731 for col_name in source_table.column_names() {
1732 expanded_items.push(SelectItem::Column {
1733 column: ColumnRef::unquoted(col_name.to_string()),
1734 leading_comments: vec![],
1735 trailing_comment: None,
1736 });
1737 }
1738 }
1739 _ => expanded_items.push(item.clone()),
1740 }
1741 }
1742
1743 for item in &expanded_items {
1745 let column_name = match item {
1746 SelectItem::Column {
1747 column: col_ref, ..
1748 } => col_ref.name.clone(),
1749 SelectItem::Expression { alias, .. } => alias.clone(),
1750 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1751 };
1752 result_table.add_column(DataColumn::new(&column_name));
1753 }
1754
1755 let mut evaluator =
1757 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1758
1759 for &row_idx in visible_rows {
1760 let mut unnest_expansions = Vec::new();
1762 let mut unnest_indices = Vec::new();
1763
1764 for (col_idx, item) in expanded_items.iter().enumerate() {
1765 if let SelectItem::Expression { expr, .. } = item {
1766 if let Some(expansion_result) = self.try_expand_unnest(
1767 &expr,
1768 source_table,
1769 row_idx,
1770 &mut evaluator,
1771 &expander_registry,
1772 )? {
1773 unnest_expansions.push(expansion_result);
1774 unnest_indices.push(col_idx);
1775 }
1776 }
1777 }
1778
1779 let expansion_count = if unnest_expansions.is_empty() {
1781 1 } else {
1783 unnest_expansions
1784 .iter()
1785 .map(|exp| exp.row_count())
1786 .max()
1787 .unwrap_or(1)
1788 };
1789
1790 for output_idx in 0..expansion_count {
1792 let mut row_values = Vec::new();
1793
1794 for (col_idx, item) in expanded_items.iter().enumerate() {
1795 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1797
1798 let value = if let Some(unnest_idx) = unnest_position {
1799 let expansion = &unnest_expansions[unnest_idx];
1801 expansion
1802 .values
1803 .get(output_idx)
1804 .cloned()
1805 .unwrap_or(DataValue::Null)
1806 } else {
1807 match item {
1809 SelectItem::Column {
1810 column: col_ref, ..
1811 } => {
1812 let col_idx =
1813 source_table.get_column_index(&col_ref.name).ok_or_else(
1814 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1815 )?;
1816 let row = source_table
1817 .get_row(row_idx)
1818 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1819 row.get(col_idx)
1820 .ok_or_else(|| {
1821 anyhow::anyhow!("Column {} not found in row", col_idx)
1822 })?
1823 .clone()
1824 }
1825 SelectItem::Expression { expr, .. } => {
1826 evaluator.evaluate(&expr, row_idx)?
1828 }
1829 SelectItem::Star { .. } => unreachable!(),
1830 }
1831 };
1832
1833 row_values.push(value);
1834 }
1835
1836 result_table
1837 .add_row(DataRow::new(row_values))
1838 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1839 }
1840 }
1841
1842 debug!(
1843 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1844 visible_rows.len(),
1845 result_table.row_count()
1846 );
1847
1848 Ok(DataView::new(Arc::new(result_table)))
1849 }
1850
1851 fn try_expand_unnest(
1854 &self,
1855 expr: &SqlExpression,
1856 _source_table: &DataTable,
1857 row_idx: usize,
1858 evaluator: &mut ArithmeticEvaluator,
1859 expander_registry: &RowExpanderRegistry,
1860 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1861 if let SqlExpression::Unnest { column, delimiter } = expr {
1863 let column_value = evaluator.evaluate(column, row_idx)?;
1865
1866 let delimiter_value = DataValue::String(delimiter.clone());
1868
1869 let expander = expander_registry
1871 .get("UNNEST")
1872 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1873
1874 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1876 return Ok(Some(expansion));
1877 }
1878
1879 if let SqlExpression::FunctionCall { name, args, .. } = expr {
1881 if name.to_uppercase() == "UNNEST" {
1882 if args.len() != 2 {
1884 return Err(anyhow::anyhow!(
1885 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1886 ));
1887 }
1888
1889 let column_value = evaluator.evaluate(&args[0], row_idx)?;
1891
1892 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1894
1895 let expander = expander_registry
1897 .get("UNNEST")
1898 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1899
1900 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1902 return Ok(Some(expansion));
1903 }
1904 }
1905
1906 Ok(None)
1907 }
1908
1909 fn apply_aggregate_select(
1911 &self,
1912 view: DataView,
1913 select_items: &[SelectItem],
1914 ) -> Result<DataView> {
1915 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1916
1917 let source_table = view.source();
1918 let mut result_table = DataTable::new("aggregate_result");
1919
1920 for item in select_items {
1922 let column_name = match item {
1923 SelectItem::Expression { alias, .. } => alias.clone(),
1924 _ => unreachable!("Should only have expressions in aggregate-only query"),
1925 };
1926 result_table.add_column(DataColumn::new(&column_name));
1927 }
1928
1929 let visible_rows = view.visible_row_indices().to_vec();
1931 let mut evaluator =
1932 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1933 .with_visible_rows(visible_rows);
1934
1935 let mut row_values = Vec::new();
1937 for item in select_items {
1938 match item {
1939 SelectItem::Expression { expr, .. } => {
1940 let value = evaluator.evaluate(expr, 0)?;
1943 row_values.push(value);
1944 }
1945 _ => unreachable!("Should only have expressions in aggregate-only query"),
1946 }
1947 }
1948
1949 result_table
1951 .add_row(DataRow::new(row_values))
1952 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1953
1954 Ok(DataView::new(Arc::new(result_table)))
1955 }
1956
1957 fn resolve_select_columns(
1959 &self,
1960 table: &DataTable,
1961 select_items: &[SelectItem],
1962 ) -> Result<Vec<usize>> {
1963 let mut indices = Vec::new();
1964 let table_columns = table.column_names();
1965
1966 for item in select_items {
1967 match item {
1968 SelectItem::Column {
1969 column: col_ref, ..
1970 } => {
1971 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1973 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1975 table.find_column_by_qualified_name(&qualified_name)
1976 .ok_or_else(|| {
1977 let has_qualified = table.columns.iter()
1979 .any(|c| c.qualified_name.is_some());
1980 if !has_qualified {
1981 anyhow::anyhow!(
1982 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1983 qualified_name, table_prefix
1984 )
1985 } else {
1986 anyhow::anyhow!("Column '{}' not found", qualified_name)
1987 }
1988 })?
1989 } else {
1990 table_columns
1992 .iter()
1993 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1994 .ok_or_else(|| {
1995 let suggestion = self.find_similar_column(table, &col_ref.name);
1996 match suggestion {
1997 Some(similar) => anyhow::anyhow!(
1998 "Column '{}' not found. Did you mean '{}'?",
1999 col_ref.name,
2000 similar
2001 ),
2002 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2003 }
2004 })?
2005 };
2006 indices.push(index);
2007 }
2008 SelectItem::Star { .. } => {
2009 for i in 0..table_columns.len() {
2011 indices.push(i);
2012 }
2013 }
2014 SelectItem::Expression { .. } => {
2015 return Err(anyhow::anyhow!(
2016 "Computed expressions require new table creation"
2017 ));
2018 }
2019 }
2020 }
2021
2022 Ok(indices)
2023 }
2024
2025 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2027 use std::collections::HashSet;
2028
2029 let source = view.source();
2030 let visible_cols = view.visible_column_indices();
2031 let visible_rows = view.visible_row_indices();
2032
2033 let mut seen_rows = HashSet::new();
2035 let mut unique_row_indices = Vec::new();
2036
2037 for &row_idx in visible_rows {
2038 let mut row_key = Vec::new();
2040 for &col_idx in visible_cols {
2041 let value = source
2042 .get_value(row_idx, col_idx)
2043 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2044 row_key.push(format!("{:?}", value));
2046 }
2047
2048 if seen_rows.insert(row_key) {
2050 unique_row_indices.push(row_idx);
2052 }
2053 }
2054
2055 Ok(view.with_rows(unique_row_indices))
2057 }
2058
2059 fn apply_multi_order_by(
2061 &self,
2062 view: DataView,
2063 order_by_columns: &[OrderByColumn],
2064 ) -> Result<DataView> {
2065 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2066 }
2067
2068 fn apply_multi_order_by_with_context(
2070 &self,
2071 mut view: DataView,
2072 order_by_columns: &[OrderByColumn],
2073 _exec_context: Option<&ExecutionContext>,
2074 ) -> Result<DataView> {
2075 let mut sort_columns = Vec::new();
2077
2078 for order_col in order_by_columns {
2079 let col_index = if order_col.column.contains('.') {
2081 if let Some(dot_pos) = order_col.column.rfind('.') {
2083 let col_name = &order_col.column[dot_pos + 1..];
2084
2085 debug!(
2088 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2089 col_name, order_col.column
2090 );
2091 view.source().get_column_index(col_name)
2092 } else {
2093 view.source().get_column_index(&order_col.column)
2094 }
2095 } else {
2096 view.source().get_column_index(&order_col.column)
2098 }
2099 .ok_or_else(|| {
2100 let suggestion = self.find_similar_column(view.source(), &order_col.column);
2102 match suggestion {
2103 Some(similar) => anyhow::anyhow!(
2104 "Column '{}' not found. Did you mean '{}'?",
2105 order_col.column,
2106 similar
2107 ),
2108 None => {
2109 let available_cols = view.source().column_names().join(", ");
2111 anyhow::anyhow!(
2112 "Column '{}' not found. Available columns: {}",
2113 order_col.column,
2114 available_cols
2115 )
2116 }
2117 }
2118 })?;
2119
2120 let ascending = matches!(order_col.direction, SortDirection::Asc);
2121 sort_columns.push((col_index, ascending));
2122 }
2123
2124 view.apply_multi_sort(&sort_columns)?;
2126 Ok(view)
2127 }
2128
2129 fn apply_group_by(
2131 &self,
2132 view: DataView,
2133 group_by_exprs: &[SqlExpression],
2134 select_items: &[SelectItem],
2135 having: Option<&SqlExpression>,
2136 plan: &mut ExecutionPlanBuilder,
2137 ) -> Result<DataView> {
2138 let (result_view, phase_info) = self.apply_group_by_expressions(
2140 view,
2141 group_by_exprs,
2142 select_items,
2143 having,
2144 self.case_insensitive,
2145 self.date_notation.clone(),
2146 )?;
2147
2148 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2150 plan.add_detail(format!(
2151 "Phase 1 - Group Building: {:.3}ms",
2152 phase_info.phase2_key_building.as_secs_f64() * 1000.0
2153 ));
2154 plan.add_detail(format!(
2155 " • Processing {} rows into {} groups",
2156 phase_info.total_rows, phase_info.num_groups
2157 ));
2158 plan.add_detail(format!(
2159 "Phase 2 - Aggregation: {:.3}ms",
2160 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2161 ));
2162 if phase_info.phase4_having_evaluation > Duration::ZERO {
2163 plan.add_detail(format!(
2164 "Phase 3 - HAVING Filter: {:.3}ms",
2165 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2166 ));
2167 plan.add_detail(format!(
2168 " • Filtered {} groups",
2169 phase_info.groups_filtered_by_having
2170 ));
2171 }
2172 plan.add_detail(format!(
2173 "Total GROUP BY time: {:.3}ms",
2174 phase_info.total_time.as_secs_f64() * 1000.0
2175 ));
2176
2177 Ok(result_view)
2178 }
2179
2180 pub fn estimate_group_cardinality(
2183 &self,
2184 view: &DataView,
2185 group_by_exprs: &[SqlExpression],
2186 ) -> usize {
2187 let row_count = view.get_visible_rows().len();
2189 if row_count <= 100 {
2190 return row_count;
2191 }
2192
2193 let sample_size = min(1000, row_count / 10).max(100);
2195 let mut seen = FxHashSet::default();
2196
2197 let visible_rows = view.get_visible_rows();
2198 for (i, &row_idx) in visible_rows.iter().enumerate() {
2199 if i >= sample_size {
2200 break;
2201 }
2202
2203 let mut key_values = Vec::new();
2205 for expr in group_by_exprs {
2206 let mut evaluator = ArithmeticEvaluator::new(view.source());
2207 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2208 key_values.push(value);
2209 }
2210
2211 seen.insert(key_values);
2212 }
2213
2214 let sample_cardinality = seen.len();
2216 let estimated = (sample_cardinality * row_count) / sample_size;
2217
2218 estimated.min(row_count).max(sample_cardinality)
2220 }
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225 use super::*;
2226 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2227
2228 fn create_test_table() -> Arc<DataTable> {
2229 let mut table = DataTable::new("test");
2230
2231 table.add_column(DataColumn::new("id"));
2233 table.add_column(DataColumn::new("name"));
2234 table.add_column(DataColumn::new("age"));
2235
2236 table
2238 .add_row(DataRow::new(vec![
2239 DataValue::Integer(1),
2240 DataValue::String("Alice".to_string()),
2241 DataValue::Integer(30),
2242 ]))
2243 .unwrap();
2244
2245 table
2246 .add_row(DataRow::new(vec![
2247 DataValue::Integer(2),
2248 DataValue::String("Bob".to_string()),
2249 DataValue::Integer(25),
2250 ]))
2251 .unwrap();
2252
2253 table
2254 .add_row(DataRow::new(vec![
2255 DataValue::Integer(3),
2256 DataValue::String("Charlie".to_string()),
2257 DataValue::Integer(35),
2258 ]))
2259 .unwrap();
2260
2261 Arc::new(table)
2262 }
2263
2264 #[test]
2265 fn test_select_all() {
2266 let table = create_test_table();
2267 let engine = QueryEngine::new();
2268
2269 let view = engine
2270 .execute(table.clone(), "SELECT * FROM users")
2271 .unwrap();
2272 assert_eq!(view.row_count(), 3);
2273 assert_eq!(view.column_count(), 3);
2274 }
2275
2276 #[test]
2277 fn test_select_columns() {
2278 let table = create_test_table();
2279 let engine = QueryEngine::new();
2280
2281 let view = engine
2282 .execute(table.clone(), "SELECT name, age FROM users")
2283 .unwrap();
2284 assert_eq!(view.row_count(), 3);
2285 assert_eq!(view.column_count(), 2);
2286 }
2287
2288 #[test]
2289 fn test_select_with_limit() {
2290 let table = create_test_table();
2291 let engine = QueryEngine::new();
2292
2293 let view = engine
2294 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2295 .unwrap();
2296 assert_eq!(view.row_count(), 2);
2297 }
2298
2299 #[test]
2300 fn test_type_coercion_contains() {
2301 let _ = tracing_subscriber::fmt()
2303 .with_max_level(tracing::Level::DEBUG)
2304 .try_init();
2305
2306 let mut table = DataTable::new("test");
2307 table.add_column(DataColumn::new("id"));
2308 table.add_column(DataColumn::new("status"));
2309 table.add_column(DataColumn::new("price"));
2310
2311 table
2313 .add_row(DataRow::new(vec![
2314 DataValue::Integer(1),
2315 DataValue::String("Pending".to_string()),
2316 DataValue::Float(99.99),
2317 ]))
2318 .unwrap();
2319
2320 table
2321 .add_row(DataRow::new(vec![
2322 DataValue::Integer(2),
2323 DataValue::String("Confirmed".to_string()),
2324 DataValue::Float(150.50),
2325 ]))
2326 .unwrap();
2327
2328 table
2329 .add_row(DataRow::new(vec![
2330 DataValue::Integer(3),
2331 DataValue::String("Pending".to_string()),
2332 DataValue::Float(75.00),
2333 ]))
2334 .unwrap();
2335
2336 let table = Arc::new(table);
2337 let engine = QueryEngine::new();
2338
2339 println!("\n=== Testing WHERE clause with Contains ===");
2340 println!("Table has {} rows", table.row_count());
2341 for i in 0..table.row_count() {
2342 let status = table.get_value(i, 1);
2343 println!("Row {i}: status = {status:?}");
2344 }
2345
2346 println!("\n--- Test 1: status.Contains('pend') ---");
2348 let result = engine.execute(
2349 table.clone(),
2350 "SELECT * FROM test WHERE status.Contains('pend')",
2351 );
2352 match result {
2353 Ok(view) => {
2354 println!("SUCCESS: Found {} matching rows", view.row_count());
2355 assert_eq!(view.row_count(), 2); }
2357 Err(e) => {
2358 panic!("Query failed: {e}");
2359 }
2360 }
2361
2362 println!("\n--- Test 2: price.Contains('9') ---");
2364 let result = engine.execute(
2365 table.clone(),
2366 "SELECT * FROM test WHERE price.Contains('9')",
2367 );
2368 match result {
2369 Ok(view) => {
2370 println!(
2371 "SUCCESS: Found {} matching rows with price containing '9'",
2372 view.row_count()
2373 );
2374 assert!(view.row_count() >= 1);
2376 }
2377 Err(e) => {
2378 panic!("Numeric coercion query failed: {e}");
2379 }
2380 }
2381
2382 println!("\n=== All tests passed! ===");
2383 }
2384
2385 #[test]
2386 fn test_not_in_clause() {
2387 let _ = tracing_subscriber::fmt()
2389 .with_max_level(tracing::Level::DEBUG)
2390 .try_init();
2391
2392 let mut table = DataTable::new("test");
2393 table.add_column(DataColumn::new("id"));
2394 table.add_column(DataColumn::new("country"));
2395
2396 table
2398 .add_row(DataRow::new(vec![
2399 DataValue::Integer(1),
2400 DataValue::String("CA".to_string()),
2401 ]))
2402 .unwrap();
2403
2404 table
2405 .add_row(DataRow::new(vec![
2406 DataValue::Integer(2),
2407 DataValue::String("US".to_string()),
2408 ]))
2409 .unwrap();
2410
2411 table
2412 .add_row(DataRow::new(vec![
2413 DataValue::Integer(3),
2414 DataValue::String("UK".to_string()),
2415 ]))
2416 .unwrap();
2417
2418 let table = Arc::new(table);
2419 let engine = QueryEngine::new();
2420
2421 println!("\n=== Testing NOT IN clause ===");
2422 println!("Table has {} rows", table.row_count());
2423 for i in 0..table.row_count() {
2424 let country = table.get_value(i, 1);
2425 println!("Row {i}: country = {country:?}");
2426 }
2427
2428 println!("\n--- Test: country NOT IN ('CA') ---");
2430 let result = engine.execute(
2431 table.clone(),
2432 "SELECT * FROM test WHERE country NOT IN ('CA')",
2433 );
2434 match result {
2435 Ok(view) => {
2436 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2437 assert_eq!(view.row_count(), 2); }
2439 Err(e) => {
2440 panic!("NOT IN query failed: {e}");
2441 }
2442 }
2443
2444 println!("\n=== NOT IN test complete! ===");
2445 }
2446
2447 #[test]
2448 fn test_case_insensitive_in_and_not_in() {
2449 let _ = tracing_subscriber::fmt()
2451 .with_max_level(tracing::Level::DEBUG)
2452 .try_init();
2453
2454 let mut table = DataTable::new("test");
2455 table.add_column(DataColumn::new("id"));
2456 table.add_column(DataColumn::new("country"));
2457
2458 table
2460 .add_row(DataRow::new(vec![
2461 DataValue::Integer(1),
2462 DataValue::String("CA".to_string()), ]))
2464 .unwrap();
2465
2466 table
2467 .add_row(DataRow::new(vec![
2468 DataValue::Integer(2),
2469 DataValue::String("us".to_string()), ]))
2471 .unwrap();
2472
2473 table
2474 .add_row(DataRow::new(vec![
2475 DataValue::Integer(3),
2476 DataValue::String("UK".to_string()), ]))
2478 .unwrap();
2479
2480 let table = Arc::new(table);
2481
2482 println!("\n=== Testing Case-Insensitive IN clause ===");
2483 println!("Table has {} rows", table.row_count());
2484 for i in 0..table.row_count() {
2485 let country = table.get_value(i, 1);
2486 println!("Row {i}: country = {country:?}");
2487 }
2488
2489 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2491 let engine = QueryEngine::with_case_insensitive(true);
2492 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2493 match result {
2494 Ok(view) => {
2495 println!(
2496 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2497 view.row_count()
2498 );
2499 assert_eq!(view.row_count(), 1); }
2501 Err(e) => {
2502 panic!("Case-insensitive IN query failed: {e}");
2503 }
2504 }
2505
2506 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2508 let result = engine.execute(
2509 table.clone(),
2510 "SELECT * FROM test WHERE country NOT IN ('ca')",
2511 );
2512 match result {
2513 Ok(view) => {
2514 println!(
2515 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2516 view.row_count()
2517 );
2518 assert_eq!(view.row_count(), 2); }
2520 Err(e) => {
2521 panic!("Case-insensitive NOT IN query failed: {e}");
2522 }
2523 }
2524
2525 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2527 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
2529 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2530 match result {
2531 Ok(view) => {
2532 println!(
2533 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2534 view.row_count()
2535 );
2536 assert_eq!(view.row_count(), 0); }
2538 Err(e) => {
2539 panic!("Case-sensitive IN query failed: {e}");
2540 }
2541 }
2542
2543 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2544 }
2545
2546 #[test]
2547 #[ignore = "Parentheses in WHERE clause not yet implemented"]
2548 fn test_parentheses_in_where_clause() {
2549 let _ = tracing_subscriber::fmt()
2551 .with_max_level(tracing::Level::DEBUG)
2552 .try_init();
2553
2554 let mut table = DataTable::new("test");
2555 table.add_column(DataColumn::new("id"));
2556 table.add_column(DataColumn::new("status"));
2557 table.add_column(DataColumn::new("priority"));
2558
2559 table
2561 .add_row(DataRow::new(vec![
2562 DataValue::Integer(1),
2563 DataValue::String("Pending".to_string()),
2564 DataValue::String("High".to_string()),
2565 ]))
2566 .unwrap();
2567
2568 table
2569 .add_row(DataRow::new(vec![
2570 DataValue::Integer(2),
2571 DataValue::String("Complete".to_string()),
2572 DataValue::String("High".to_string()),
2573 ]))
2574 .unwrap();
2575
2576 table
2577 .add_row(DataRow::new(vec![
2578 DataValue::Integer(3),
2579 DataValue::String("Pending".to_string()),
2580 DataValue::String("Low".to_string()),
2581 ]))
2582 .unwrap();
2583
2584 table
2585 .add_row(DataRow::new(vec![
2586 DataValue::Integer(4),
2587 DataValue::String("Complete".to_string()),
2588 DataValue::String("Low".to_string()),
2589 ]))
2590 .unwrap();
2591
2592 let table = Arc::new(table);
2593 let engine = QueryEngine::new();
2594
2595 println!("\n=== Testing Parentheses in WHERE clause ===");
2596 println!("Table has {} rows", table.row_count());
2597 for i in 0..table.row_count() {
2598 let status = table.get_value(i, 1);
2599 let priority = table.get_value(i, 2);
2600 println!("Row {i}: status = {status:?}, priority = {priority:?}");
2601 }
2602
2603 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2605 let result = engine.execute(
2606 table.clone(),
2607 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2608 );
2609 match result {
2610 Ok(view) => {
2611 println!(
2612 "SUCCESS: Found {} rows with parenthetical logic",
2613 view.row_count()
2614 );
2615 assert_eq!(view.row_count(), 2); }
2617 Err(e) => {
2618 panic!("Parentheses query failed: {e}");
2619 }
2620 }
2621
2622 println!("\n=== Parentheses test complete! ===");
2623 }
2624
2625 #[test]
2626 #[ignore = "Numeric type coercion needs fixing"]
2627 fn test_numeric_type_coercion() {
2628 let _ = tracing_subscriber::fmt()
2630 .with_max_level(tracing::Level::DEBUG)
2631 .try_init();
2632
2633 let mut table = DataTable::new("test");
2634 table.add_column(DataColumn::new("id"));
2635 table.add_column(DataColumn::new("price"));
2636 table.add_column(DataColumn::new("quantity"));
2637
2638 table
2640 .add_row(DataRow::new(vec![
2641 DataValue::Integer(1),
2642 DataValue::Float(99.50), DataValue::Integer(100),
2644 ]))
2645 .unwrap();
2646
2647 table
2648 .add_row(DataRow::new(vec![
2649 DataValue::Integer(2),
2650 DataValue::Float(150.0), DataValue::Integer(200),
2652 ]))
2653 .unwrap();
2654
2655 table
2656 .add_row(DataRow::new(vec![
2657 DataValue::Integer(3),
2658 DataValue::Integer(75), DataValue::Integer(50),
2660 ]))
2661 .unwrap();
2662
2663 let table = Arc::new(table);
2664 let engine = QueryEngine::new();
2665
2666 println!("\n=== Testing Numeric Type Coercion ===");
2667 println!("Table has {} rows", table.row_count());
2668 for i in 0..table.row_count() {
2669 let price = table.get_value(i, 1);
2670 let quantity = table.get_value(i, 2);
2671 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2672 }
2673
2674 println!("\n--- Test: price.Contains('.') ---");
2676 let result = engine.execute(
2677 table.clone(),
2678 "SELECT * FROM test WHERE price.Contains('.')",
2679 );
2680 match result {
2681 Ok(view) => {
2682 println!(
2683 "SUCCESS: Found {} rows with decimal points in price",
2684 view.row_count()
2685 );
2686 assert_eq!(view.row_count(), 2); }
2688 Err(e) => {
2689 panic!("Numeric Contains query failed: {e}");
2690 }
2691 }
2692
2693 println!("\n--- Test: quantity.Contains('0') ---");
2695 let result = engine.execute(
2696 table.clone(),
2697 "SELECT * FROM test WHERE quantity.Contains('0')",
2698 );
2699 match result {
2700 Ok(view) => {
2701 println!(
2702 "SUCCESS: Found {} rows with '0' in quantity",
2703 view.row_count()
2704 );
2705 assert_eq!(view.row_count(), 2); }
2707 Err(e) => {
2708 panic!("Integer Contains query failed: {e}");
2709 }
2710 }
2711
2712 println!("\n=== Numeric type coercion test complete! ===");
2713 }
2714
2715 #[test]
2716 fn test_datetime_comparisons() {
2717 let _ = tracing_subscriber::fmt()
2719 .with_max_level(tracing::Level::DEBUG)
2720 .try_init();
2721
2722 let mut table = DataTable::new("test");
2723 table.add_column(DataColumn::new("id"));
2724 table.add_column(DataColumn::new("created_date"));
2725
2726 table
2728 .add_row(DataRow::new(vec![
2729 DataValue::Integer(1),
2730 DataValue::String("2024-12-15".to_string()),
2731 ]))
2732 .unwrap();
2733
2734 table
2735 .add_row(DataRow::new(vec![
2736 DataValue::Integer(2),
2737 DataValue::String("2025-01-15".to_string()),
2738 ]))
2739 .unwrap();
2740
2741 table
2742 .add_row(DataRow::new(vec![
2743 DataValue::Integer(3),
2744 DataValue::String("2025-02-15".to_string()),
2745 ]))
2746 .unwrap();
2747
2748 let table = Arc::new(table);
2749 let engine = QueryEngine::new();
2750
2751 println!("\n=== Testing DateTime Comparisons ===");
2752 println!("Table has {} rows", table.row_count());
2753 for i in 0..table.row_count() {
2754 let date = table.get_value(i, 1);
2755 println!("Row {i}: created_date = {date:?}");
2756 }
2757
2758 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2760 let result = engine.execute(
2761 table.clone(),
2762 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2763 );
2764 match result {
2765 Ok(view) => {
2766 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2767 assert_eq!(view.row_count(), 2); }
2769 Err(e) => {
2770 panic!("DateTime comparison query failed: {e}");
2771 }
2772 }
2773
2774 println!("\n=== DateTime comparison test complete! ===");
2775 }
2776
2777 #[test]
2778 fn test_not_with_method_calls() {
2779 let _ = tracing_subscriber::fmt()
2781 .with_max_level(tracing::Level::DEBUG)
2782 .try_init();
2783
2784 let mut table = DataTable::new("test");
2785 table.add_column(DataColumn::new("id"));
2786 table.add_column(DataColumn::new("status"));
2787
2788 table
2790 .add_row(DataRow::new(vec![
2791 DataValue::Integer(1),
2792 DataValue::String("Pending Review".to_string()),
2793 ]))
2794 .unwrap();
2795
2796 table
2797 .add_row(DataRow::new(vec![
2798 DataValue::Integer(2),
2799 DataValue::String("Complete".to_string()),
2800 ]))
2801 .unwrap();
2802
2803 table
2804 .add_row(DataRow::new(vec![
2805 DataValue::Integer(3),
2806 DataValue::String("Pending Approval".to_string()),
2807 ]))
2808 .unwrap();
2809
2810 let table = Arc::new(table);
2811 let engine = QueryEngine::with_case_insensitive(true);
2812
2813 println!("\n=== Testing NOT with Method Calls ===");
2814 println!("Table has {} rows", table.row_count());
2815 for i in 0..table.row_count() {
2816 let status = table.get_value(i, 1);
2817 println!("Row {i}: status = {status:?}");
2818 }
2819
2820 println!("\n--- Test: NOT status.Contains('pend') ---");
2822 let result = engine.execute(
2823 table.clone(),
2824 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2825 );
2826 match result {
2827 Ok(view) => {
2828 println!(
2829 "SUCCESS: Found {} rows NOT containing 'pend'",
2830 view.row_count()
2831 );
2832 assert_eq!(view.row_count(), 1); }
2834 Err(e) => {
2835 panic!("NOT Contains query failed: {e}");
2836 }
2837 }
2838
2839 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2841 let result = engine.execute(
2842 table.clone(),
2843 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2844 );
2845 match result {
2846 Ok(view) => {
2847 println!(
2848 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2849 view.row_count()
2850 );
2851 assert_eq!(view.row_count(), 1); }
2853 Err(e) => {
2854 panic!("NOT StartsWith query failed: {e}");
2855 }
2856 }
2857
2858 println!("\n=== NOT with method calls test complete! ===");
2859 }
2860
2861 #[test]
2862 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2863 fn test_complex_logical_expressions() {
2864 let _ = tracing_subscriber::fmt()
2866 .with_max_level(tracing::Level::DEBUG)
2867 .try_init();
2868
2869 let mut table = DataTable::new("test");
2870 table.add_column(DataColumn::new("id"));
2871 table.add_column(DataColumn::new("status"));
2872 table.add_column(DataColumn::new("priority"));
2873 table.add_column(DataColumn::new("assigned"));
2874
2875 table
2877 .add_row(DataRow::new(vec![
2878 DataValue::Integer(1),
2879 DataValue::String("Pending".to_string()),
2880 DataValue::String("High".to_string()),
2881 DataValue::String("John".to_string()),
2882 ]))
2883 .unwrap();
2884
2885 table
2886 .add_row(DataRow::new(vec![
2887 DataValue::Integer(2),
2888 DataValue::String("Complete".to_string()),
2889 DataValue::String("High".to_string()),
2890 DataValue::String("Jane".to_string()),
2891 ]))
2892 .unwrap();
2893
2894 table
2895 .add_row(DataRow::new(vec![
2896 DataValue::Integer(3),
2897 DataValue::String("Pending".to_string()),
2898 DataValue::String("Low".to_string()),
2899 DataValue::String("John".to_string()),
2900 ]))
2901 .unwrap();
2902
2903 table
2904 .add_row(DataRow::new(vec![
2905 DataValue::Integer(4),
2906 DataValue::String("In Progress".to_string()),
2907 DataValue::String("Medium".to_string()),
2908 DataValue::String("Jane".to_string()),
2909 ]))
2910 .unwrap();
2911
2912 let table = Arc::new(table);
2913 let engine = QueryEngine::new();
2914
2915 println!("\n=== Testing Complex Logical Expressions ===");
2916 println!("Table has {} rows", table.row_count());
2917 for i in 0..table.row_count() {
2918 let status = table.get_value(i, 1);
2919 let priority = table.get_value(i, 2);
2920 let assigned = table.get_value(i, 3);
2921 println!(
2922 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2923 );
2924 }
2925
2926 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2928 let result = engine.execute(
2929 table.clone(),
2930 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2931 );
2932 match result {
2933 Ok(view) => {
2934 println!(
2935 "SUCCESS: Found {} rows with complex logic",
2936 view.row_count()
2937 );
2938 assert_eq!(view.row_count(), 2); }
2940 Err(e) => {
2941 panic!("Complex logic query failed: {e}");
2942 }
2943 }
2944
2945 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2947 let result = engine.execute(
2948 table.clone(),
2949 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2950 );
2951 match result {
2952 Ok(view) => {
2953 println!(
2954 "SUCCESS: Found {} rows with NOT complex logic",
2955 view.row_count()
2956 );
2957 assert_eq!(view.row_count(), 2); }
2959 Err(e) => {
2960 panic!("NOT complex logic query failed: {e}");
2961 }
2962 }
2963
2964 println!("\n=== Complex logical expressions test complete! ===");
2965 }
2966
2967 #[test]
2968 fn test_mixed_data_types_and_edge_cases() {
2969 let _ = tracing_subscriber::fmt()
2971 .with_max_level(tracing::Level::DEBUG)
2972 .try_init();
2973
2974 let mut table = DataTable::new("test");
2975 table.add_column(DataColumn::new("id"));
2976 table.add_column(DataColumn::new("value"));
2977 table.add_column(DataColumn::new("nullable_field"));
2978
2979 table
2981 .add_row(DataRow::new(vec![
2982 DataValue::Integer(1),
2983 DataValue::String("123.45".to_string()),
2984 DataValue::String("present".to_string()),
2985 ]))
2986 .unwrap();
2987
2988 table
2989 .add_row(DataRow::new(vec![
2990 DataValue::Integer(2),
2991 DataValue::Float(678.90),
2992 DataValue::Null,
2993 ]))
2994 .unwrap();
2995
2996 table
2997 .add_row(DataRow::new(vec![
2998 DataValue::Integer(3),
2999 DataValue::Boolean(true),
3000 DataValue::String("also present".to_string()),
3001 ]))
3002 .unwrap();
3003
3004 table
3005 .add_row(DataRow::new(vec![
3006 DataValue::Integer(4),
3007 DataValue::String("false".to_string()),
3008 DataValue::Null,
3009 ]))
3010 .unwrap();
3011
3012 let table = Arc::new(table);
3013 let engine = QueryEngine::new();
3014
3015 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3016 println!("Table has {} rows", table.row_count());
3017 for i in 0..table.row_count() {
3018 let value = table.get_value(i, 1);
3019 let nullable = table.get_value(i, 2);
3020 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3021 }
3022
3023 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3025 let result = engine.execute(
3026 table.clone(),
3027 "SELECT * FROM test WHERE value.Contains('true')",
3028 );
3029 match result {
3030 Ok(view) => {
3031 println!(
3032 "SUCCESS: Found {} rows with boolean coercion",
3033 view.row_count()
3034 );
3035 assert_eq!(view.row_count(), 1); }
3037 Err(e) => {
3038 panic!("Boolean coercion query failed: {e}");
3039 }
3040 }
3041
3042 println!("\n--- Test: id IN (1, 3) ---");
3044 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3045 match result {
3046 Ok(view) => {
3047 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3048 assert_eq!(view.row_count(), 2); }
3050 Err(e) => {
3051 panic!("Multiple IN values query failed: {e}");
3052 }
3053 }
3054
3055 println!("\n=== Mixed data types test complete! ===");
3056 }
3057
3058 #[test]
3060 fn test_aggregate_only_single_row() {
3061 let table = create_test_stock_data();
3062 let engine = QueryEngine::new();
3063
3064 let result = engine
3066 .execute(
3067 table.clone(),
3068 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3069 )
3070 .expect("Query should succeed");
3071
3072 assert_eq!(
3073 result.row_count(),
3074 1,
3075 "Aggregate-only query should return exactly 1 row"
3076 );
3077 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3078
3079 let source = result.source();
3081 let row = source.get_row(0).expect("Should have first row");
3082
3083 assert_eq!(row.values[0], DataValue::Integer(5));
3085
3086 assert_eq!(row.values[1], DataValue::Float(99.5));
3088
3089 assert_eq!(row.values[2], DataValue::Float(105.0));
3091
3092 if let DataValue::Float(avg) = &row.values[3] {
3094 assert!(
3095 (avg - 102.4).abs() < 0.01,
3096 "Average should be approximately 102.4, got {}",
3097 avg
3098 );
3099 } else {
3100 panic!("AVG should return a Float value");
3101 }
3102 }
3103
3104 #[test]
3106 fn test_single_aggregate_single_row() {
3107 let table = create_test_stock_data();
3108 let engine = QueryEngine::new();
3109
3110 let result = engine
3111 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3112 .expect("Query should succeed");
3113
3114 assert_eq!(
3115 result.row_count(),
3116 1,
3117 "Single aggregate query should return exactly 1 row"
3118 );
3119 assert_eq!(result.column_count(), 1, "Should have 1 column");
3120
3121 let source = result.source();
3122 let row = source.get_row(0).expect("Should have first row");
3123 assert_eq!(row.values[0], DataValue::Integer(5));
3124 }
3125
3126 #[test]
3128 fn test_aggregate_with_where_single_row() {
3129 let table = create_test_stock_data();
3130 let engine = QueryEngine::new();
3131
3132 let result = engine
3134 .execute(
3135 table.clone(),
3136 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3137 )
3138 .expect("Query should succeed");
3139
3140 assert_eq!(
3141 result.row_count(),
3142 1,
3143 "Filtered aggregate query should return exactly 1 row"
3144 );
3145 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3146
3147 let source = result.source();
3148 let row = source.get_row(0).expect("Should have first row");
3149
3150 assert_eq!(row.values[0], DataValue::Integer(2));
3152 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
3155
3156 #[test]
3157 fn test_not_in_parsing() {
3158 use crate::sql::recursive_parser::Parser;
3159
3160 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3161 println!("\n=== Testing NOT IN parsing ===");
3162 println!("Parsing query: {query}");
3163
3164 let mut parser = Parser::new(query);
3165 match parser.parse() {
3166 Ok(statement) => {
3167 println!("Parsed statement: {statement:#?}");
3168 if let Some(where_clause) = statement.where_clause {
3169 println!("WHERE conditions: {:#?}", where_clause.conditions);
3170 if let Some(first_condition) = where_clause.conditions.first() {
3171 println!("First condition expression: {:#?}", first_condition.expr);
3172 }
3173 }
3174 }
3175 Err(e) => {
3176 panic!("Parse error: {e}");
3177 }
3178 }
3179 }
3180
3181 fn create_test_stock_data() -> Arc<DataTable> {
3183 let mut table = DataTable::new("stock");
3184
3185 table.add_column(DataColumn::new("symbol"));
3186 table.add_column(DataColumn::new("close"));
3187 table.add_column(DataColumn::new("volume"));
3188
3189 let test_data = vec![
3191 ("AAPL", 99.5, 1000),
3192 ("AAPL", 101.2, 1500),
3193 ("AAPL", 103.5, 2000),
3194 ("AAPL", 105.0, 1200),
3195 ("AAPL", 102.8, 1800),
3196 ];
3197
3198 for (symbol, close, volume) in test_data {
3199 table
3200 .add_row(DataRow::new(vec![
3201 DataValue::String(symbol.to_string()),
3202 DataValue::Float(close),
3203 DataValue::Integer(volume),
3204 ]))
3205 .expect("Should add row successfully");
3206 }
3207
3208 Arc::new(table)
3209 }
3210}
3211
3212#[cfg(test)]
3213#[path = "query_engine_tests.rs"]
3214mod query_engine_tests;