1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::recursive_parser::{
27 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
28 TableFunction,
29};
30
31#[derive(Debug, Clone)]
33pub struct ExecutionContext {
34 alias_map: HashMap<String, String>,
37}
38
39impl ExecutionContext {
40 pub fn new() -> Self {
42 Self {
43 alias_map: HashMap::new(),
44 }
45 }
46
47 pub fn register_alias(&mut self, alias: String, table_name: String) {
49 debug!("Registering alias: {} -> {}", alias, table_name);
50 self.alias_map.insert(alias, table_name);
51 }
52
53 pub fn resolve_alias(&self, name: &str) -> String {
56 self.alias_map
57 .get(name)
58 .cloned()
59 .unwrap_or_else(|| name.to_string())
60 }
61
62 pub fn is_alias(&self, name: &str) -> bool {
64 self.alias_map.contains_key(name)
65 }
66
67 pub fn get_aliases(&self) -> HashMap<String, String> {
69 self.alias_map.clone()
70 }
71
72 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
87 if let Some(table_prefix) = &column_ref.table_prefix {
88 let actual_table = self.resolve_alias(table_prefix);
90
91 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
93 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
94 debug!(
95 "Resolved {}.{} -> qualified column '{}' at index {}",
96 table_prefix, column_ref.name, qualified_name, idx
97 );
98 return Ok(idx);
99 }
100
101 if let Some(idx) = table.get_column_index(&column_ref.name) {
103 debug!(
104 "Resolved {}.{} -> unqualified column '{}' at index {}",
105 table_prefix, column_ref.name, column_ref.name, idx
106 );
107 return Ok(idx);
108 }
109
110 Err(anyhow!(
112 "Column '{}' not found. Table '{}' may not support qualified column names",
113 qualified_name,
114 actual_table
115 ))
116 } else {
117 if let Some(idx) = table.get_column_index(&column_ref.name) {
119 debug!(
120 "Resolved unqualified column '{}' at index {}",
121 column_ref.name, idx
122 );
123 return Ok(idx);
124 }
125
126 if column_ref.name.contains('.') {
128 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
129 debug!(
130 "Resolved '{}' as qualified column at index {}",
131 column_ref.name, idx
132 );
133 return Ok(idx);
134 }
135 }
136
137 let suggestion = self.find_similar_column(table, &column_ref.name);
139 match suggestion {
140 Some(similar) => Err(anyhow!(
141 "Column '{}' not found. Did you mean '{}'?",
142 column_ref.name,
143 similar
144 )),
145 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
146 }
147 }
148 }
149
150 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
152 let columns = table.column_names();
153 let mut best_match: Option<(String, usize)> = None;
154
155 for col in columns {
156 let distance = edit_distance(name, &col);
157 if distance <= 2 {
158 match best_match {
160 Some((_, best_dist)) if distance < best_dist => {
161 best_match = Some((col.clone(), distance));
162 }
163 None => {
164 best_match = Some((col.clone(), distance));
165 }
166 _ => {}
167 }
168 }
169 }
170
171 best_match.map(|(name, _)| name)
172 }
173}
174
175impl Default for ExecutionContext {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181fn edit_distance(a: &str, b: &str) -> usize {
183 let len_a = a.chars().count();
184 let len_b = b.chars().count();
185
186 if len_a == 0 {
187 return len_b;
188 }
189 if len_b == 0 {
190 return len_a;
191 }
192
193 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
194
195 for i in 0..=len_a {
196 matrix[i][0] = i;
197 }
198 for j in 0..=len_b {
199 matrix[0][j] = j;
200 }
201
202 let a_chars: Vec<char> = a.chars().collect();
203 let b_chars: Vec<char> = b.chars().collect();
204
205 for i in 1..=len_a {
206 for j in 1..=len_b {
207 let cost = if a_chars[i - 1] == b_chars[j - 1] {
208 0
209 } else {
210 1
211 };
212 matrix[i][j] = min(
213 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
214 matrix[i - 1][j - 1] + cost,
215 );
216 }
217 }
218
219 matrix[len_a][len_b]
220}
221
222#[derive(Clone)]
224pub struct QueryEngine {
225 case_insensitive: bool,
226 date_notation: String,
227 _behavior_config: Option<BehaviorConfig>,
228}
229
230impl Default for QueryEngine {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl QueryEngine {
237 #[must_use]
238 pub fn new() -> Self {
239 Self {
240 case_insensitive: false,
241 date_notation: get_date_notation(),
242 _behavior_config: None,
243 }
244 }
245
246 #[must_use]
247 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
248 let case_insensitive = config.case_insensitive_default;
249 let date_notation = get_date_notation();
251 Self {
252 case_insensitive,
253 date_notation,
254 _behavior_config: Some(config),
255 }
256 }
257
258 #[must_use]
259 pub fn with_date_notation(_date_notation: String) -> Self {
260 Self {
261 case_insensitive: false,
262 date_notation: get_date_notation(), _behavior_config: None,
264 }
265 }
266
267 #[must_use]
268 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
269 Self {
270 case_insensitive,
271 date_notation: get_date_notation(),
272 _behavior_config: None,
273 }
274 }
275
276 #[must_use]
277 pub fn with_case_insensitive_and_date_notation(
278 case_insensitive: bool,
279 _date_notation: String, ) -> Self {
281 Self {
282 case_insensitive,
283 date_notation: get_date_notation(), _behavior_config: None,
285 }
286 }
287
288 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
290 let columns = table.column_names();
291 let mut best_match: Option<(String, usize)> = None;
292
293 for col in columns {
294 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
295 let max_distance = if name.len() > 10 { 3 } else { 2 };
298 if distance <= max_distance {
299 match &best_match {
300 None => best_match = Some((col, distance)),
301 Some((_, best_dist)) if distance < *best_dist => {
302 best_match = Some((col, distance));
303 }
304 _ => {}
305 }
306 }
307 }
308
309 best_match.map(|(name, _)| name)
310 }
311
312 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
314 let len1 = s1.len();
315 let len2 = s2.len();
316 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
317
318 for i in 0..=len1 {
319 matrix[i][0] = i;
320 }
321 for j in 0..=len2 {
322 matrix[0][j] = j;
323 }
324
325 for (i, c1) in s1.chars().enumerate() {
326 for (j, c2) in s2.chars().enumerate() {
327 let cost = usize::from(c1 != c2);
328 matrix[i + 1][j + 1] = std::cmp::min(
329 matrix[i][j + 1] + 1, std::cmp::min(
331 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
334 );
335 }
336 }
337
338 matrix[len1][len2]
339 }
340
341 fn contains_unnest(expr: &SqlExpression) -> bool {
343 match expr {
344 SqlExpression::Unnest { .. } => true,
346 SqlExpression::FunctionCall { name, args, .. } => {
347 if name.to_uppercase() == "UNNEST" {
348 return true;
349 }
350 args.iter().any(Self::contains_unnest)
352 }
353 SqlExpression::BinaryOp { left, right, .. } => {
354 Self::contains_unnest(left) || Self::contains_unnest(right)
355 }
356 SqlExpression::Not { expr } => Self::contains_unnest(expr),
357 SqlExpression::CaseExpression {
358 when_branches,
359 else_branch,
360 } => {
361 when_branches.iter().any(|branch| {
362 Self::contains_unnest(&branch.condition)
363 || Self::contains_unnest(&branch.result)
364 }) || else_branch
365 .as_ref()
366 .map_or(false, |e| Self::contains_unnest(e))
367 }
368 SqlExpression::SimpleCaseExpression {
369 expr,
370 when_branches,
371 else_branch,
372 } => {
373 Self::contains_unnest(expr)
374 || when_branches.iter().any(|branch| {
375 Self::contains_unnest(&branch.value)
376 || Self::contains_unnest(&branch.result)
377 })
378 || else_branch
379 .as_ref()
380 .map_or(false, |e| Self::contains_unnest(e))
381 }
382 SqlExpression::InList { expr, values } => {
383 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
384 }
385 SqlExpression::NotInList { expr, values } => {
386 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
387 }
388 SqlExpression::Between { expr, lower, upper } => {
389 Self::contains_unnest(expr)
390 || Self::contains_unnest(lower)
391 || Self::contains_unnest(upper)
392 }
393 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
394 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
397 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::ChainedMethodCall { base, args, .. } => {
399 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
400 }
401 _ => false,
402 }
403 }
404
405 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407 let (view, _plan) = self.execute_with_plan(table, sql)?;
408 Ok(view)
409 }
410
411 pub fn execute_with_temp_tables(
413 &self,
414 table: Arc<DataTable>,
415 sql: &str,
416 temp_tables: Option<&TempTableRegistry>,
417 ) -> Result<DataView> {
418 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419 Ok(view)
420 }
421
422 pub fn execute_statement(
424 &self,
425 table: Arc<DataTable>,
426 statement: SelectStatement,
427 ) -> Result<DataView> {
428 self.execute_statement_with_temp_tables(table, statement, None)
429 }
430
431 pub fn execute_statement_with_temp_tables(
433 &self,
434 table: Arc<DataTable>,
435 statement: SelectStatement,
436 temp_tables: Option<&TempTableRegistry>,
437 ) -> Result<DataView> {
438 let mut cte_context = HashMap::new();
440
441 if let Some(temp_registry) = temp_tables {
443 for table_name in temp_registry.list_tables() {
444 if let Some(temp_table) = temp_registry.get(&table_name) {
445 debug!("Adding temp table {} to CTE context", table_name);
446 let view = DataView::new(temp_table);
447 cte_context.insert(table_name, Arc::new(view));
448 }
449 }
450 }
451
452 for cte in &statement.ctes {
453 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454 let cte_result = match &cte.cte_type {
456 CTEType::Standard(query) => {
457 let view = self.build_view_with_context(
459 table.clone(),
460 query.clone(),
461 &mut cte_context,
462 )?;
463
464 let mut materialized = self.materialize_view(view)?;
466
467 for column in materialized.columns_mut() {
469 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470 column.source_table = Some(cte.name.clone());
471 }
472
473 DataView::new(Arc::new(materialized))
474 }
475 CTEType::Web(web_spec) => {
476 use crate::web::http_fetcher::WebDataFetcher;
478
479 let fetcher = WebDataFetcher::new()?;
480 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483 for column in data_table.columns_mut() {
485 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486 column.source_table = Some(cte.name.clone());
487 }
488
489 DataView::new(Arc::new(data_table))
491 }
492 };
493 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495 debug!(
496 "QueryEngine: CTE '{}' pre-processed, stored in context",
497 cte.name
498 );
499 }
500
501 let mut subquery_executor =
503 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506 self.build_view_with_context(table, processed_statement, &mut cte_context)
508 }
509
510 pub fn execute_statement_with_cte_context(
512 &self,
513 table: Arc<DataTable>,
514 statement: SelectStatement,
515 cte_context: &HashMap<String, Arc<DataView>>,
516 ) -> Result<DataView> {
517 let mut local_context = cte_context.clone();
519
520 for cte in &statement.ctes {
522 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523 let cte_result = match &cte.cte_type {
524 CTEType::Standard(query) => {
525 let view = self.build_view_with_context(
526 table.clone(),
527 query.clone(),
528 &mut local_context,
529 )?;
530
531 let mut materialized = self.materialize_view(view)?;
533
534 for column in materialized.columns_mut() {
536 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537 column.source_table = Some(cte.name.clone());
538 }
539
540 DataView::new(Arc::new(materialized))
541 }
542 CTEType::Web(web_spec) => {
543 use crate::web::http_fetcher::WebDataFetcher;
545
546 let fetcher = WebDataFetcher::new()?;
547 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550 for column in data_table.columns_mut() {
552 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553 column.source_table = Some(cte.name.clone());
554 }
555
556 DataView::new(Arc::new(data_table))
558 }
559 };
560 local_context.insert(cte.name.clone(), Arc::new(cte_result));
561 }
562
563 let mut subquery_executor =
565 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568 self.build_view_with_context(table, processed_statement, &mut local_context)
570 }
571
572 pub fn execute_with_plan(
574 &self,
575 table: Arc<DataTable>,
576 sql: &str,
577 ) -> Result<(DataView, ExecutionPlan)> {
578 self.execute_with_plan_and_temp_tables(table, sql, None)
579 }
580
581 pub fn execute_with_plan_and_temp_tables(
583 &self,
584 table: Arc<DataTable>,
585 sql: &str,
586 temp_tables: Option<&TempTableRegistry>,
587 ) -> Result<(DataView, ExecutionPlan)> {
588 let mut plan_builder = ExecutionPlanBuilder::new();
589 let start_time = Instant::now();
590
591 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593 plan_builder.add_detail(format!("Query: {}", sql));
594 let mut parser = Parser::new(sql);
595 let statement = parser
596 .parse()
597 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598 plan_builder.add_detail(format!("Parsed successfully"));
599 if let Some(from) = &statement.from_table {
600 plan_builder.add_detail(format!("FROM: {}", from));
601 }
602 if statement.where_clause.is_some() {
603 plan_builder.add_detail("WHERE clause present".to_string());
604 }
605 plan_builder.end_step();
606
607 let mut cte_context = HashMap::new();
609
610 if let Some(temp_registry) = temp_tables {
612 for table_name in temp_registry.list_tables() {
613 if let Some(temp_table) = temp_registry.get(&table_name) {
614 debug!("Adding temp table {} to CTE context", table_name);
615 let view = DataView::new(temp_table);
616 cte_context.insert(table_name, Arc::new(view));
617 }
618 }
619 }
620
621 if !statement.ctes.is_empty() {
622 plan_builder.begin_step(
623 StepType::CTE,
624 format!("Process {} CTEs", statement.ctes.len()),
625 );
626
627 for cte in &statement.ctes {
628 let cte_start = Instant::now();
629 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631 let cte_result = match &cte.cte_type {
632 CTEType::Standard(query) => {
633 if let Some(from) = &query.from_table {
635 plan_builder.add_detail(format!("Source: {}", from));
636 }
637 if query.where_clause.is_some() {
638 plan_builder.add_detail("Has WHERE clause".to_string());
639 }
640 if query.group_by.is_some() {
641 plan_builder.add_detail("Has GROUP BY".to_string());
642 }
643
644 debug!(
645 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646 cte.name,
647 cte_context.keys().collect::<Vec<_>>()
648 );
649
650 let mut subquery_executor = SubqueryExecutor::with_cte_context(
653 self.clone(),
654 table.clone(),
655 cte_context.clone(),
656 );
657 let processed_query = subquery_executor.execute_subqueries(query)?;
658
659 let view = self.build_view_with_context(
660 table.clone(),
661 processed_query,
662 &mut cte_context,
663 )?;
664
665 let mut materialized = self.materialize_view(view)?;
667
668 for column in materialized.columns_mut() {
670 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671 column.source_table = Some(cte.name.clone());
672 }
673
674 DataView::new(Arc::new(materialized))
675 }
676 CTEType::Web(web_spec) => {
677 plan_builder.add_detail(format!("URL: {}", web_spec.url));
678 if let Some(format) = &web_spec.format {
679 plan_builder.add_detail(format!("Format: {:?}", format));
680 }
681 if let Some(cache) = web_spec.cache_seconds {
682 plan_builder.add_detail(format!("Cache: {} seconds", cache));
683 }
684
685 use crate::web::http_fetcher::WebDataFetcher;
687
688 let fetcher = WebDataFetcher::new()?;
689 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692 for column in data_table.columns_mut() {
694 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695 column.source_table = Some(cte.name.clone());
696 }
697
698 DataView::new(Arc::new(data_table))
700 }
701 };
702
703 plan_builder.set_rows_out(cte_result.row_count());
705 plan_builder.add_detail(format!(
706 "Result: {} rows, {} columns",
707 cte_result.row_count(),
708 cte_result.column_count()
709 ));
710 plan_builder.add_detail(format!(
711 "Execution time: {:.3}ms",
712 cte_start.elapsed().as_secs_f64() * 1000.0
713 ));
714
715 debug!(
716 "QueryEngine: Storing CTE '{}' in context with {} rows",
717 cte.name,
718 cte_result.row_count()
719 );
720 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721 plan_builder.end_step();
722 }
723
724 plan_builder.add_detail(format!(
725 "All {} CTEs cached in context",
726 statement.ctes.len()
727 ));
728 plan_builder.end_step();
729 }
730
731 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733 let mut subquery_executor =
734 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738 format!("{:?}", w).contains("Subquery")
740 });
741
742 if has_subqueries {
743 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744 }
745
746 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748 if has_subqueries {
749 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750 } else {
751 plan_builder.add_detail("No subqueries to process".to_string());
752 }
753
754 plan_builder.end_step();
755 let result = self.build_view_with_context_and_plan(
756 table,
757 processed_statement,
758 &mut cte_context,
759 &mut plan_builder,
760 )?;
761
762 let total_duration = start_time.elapsed();
763 info!(
764 "Query execution complete: total={:?}, rows={}",
765 total_duration,
766 result.row_count()
767 );
768
769 let plan = plan_builder.build();
770 Ok((result, plan))
771 }
772
773 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775 let mut cte_context = HashMap::new();
776 self.build_view_with_context(table, statement, &mut cte_context)
777 }
778
779 fn build_view_with_context(
781 &self,
782 table: Arc<DataTable>,
783 statement: SelectStatement,
784 cte_context: &mut HashMap<String, Arc<DataView>>,
785 ) -> Result<DataView> {
786 let mut dummy_plan = ExecutionPlanBuilder::new();
787 let mut exec_context = ExecutionContext::new();
788 self.build_view_with_context_and_plan_and_exec(
789 table,
790 statement,
791 cte_context,
792 &mut dummy_plan,
793 &mut exec_context,
794 )
795 }
796
797 fn build_view_with_context_and_plan(
799 &self,
800 table: Arc<DataTable>,
801 statement: SelectStatement,
802 cte_context: &mut HashMap<String, Arc<DataView>>,
803 plan: &mut ExecutionPlanBuilder,
804 ) -> Result<DataView> {
805 let mut exec_context = ExecutionContext::new();
806 self.build_view_with_context_and_plan_and_exec(
807 table,
808 statement,
809 cte_context,
810 plan,
811 &mut exec_context,
812 )
813 }
814
815 fn build_view_with_context_and_plan_and_exec(
817 &self,
818 table: Arc<DataTable>,
819 statement: SelectStatement,
820 cte_context: &mut HashMap<String, Arc<DataView>>,
821 plan: &mut ExecutionPlanBuilder,
822 exec_context: &mut ExecutionContext,
823 ) -> Result<DataView> {
824 for cte in &statement.ctes {
826 if cte_context.contains_key(&cte.name) {
828 debug!(
829 "QueryEngine: CTE '{}' already in context, skipping",
830 cte.name
831 );
832 continue;
833 }
834
835 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836 debug!(
837 "QueryEngine: Available CTEs for '{}': {:?}",
838 cte.name,
839 cte_context.keys().collect::<Vec<_>>()
840 );
841
842 let cte_result = match &cte.cte_type {
844 CTEType::Standard(query) => {
845 let view =
846 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848 let mut materialized = self.materialize_view(view)?;
850
851 for column in materialized.columns_mut() {
853 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854 column.source_table = Some(cte.name.clone());
855 }
856
857 DataView::new(Arc::new(materialized))
858 }
859 CTEType::Web(_web_spec) => {
860 return Err(anyhow!(
862 "Web CTEs should be processed in execute_select method"
863 ));
864 }
865 };
866
867 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869 debug!(
870 "QueryEngine: CTE '{}' processed, stored in context",
871 cte.name
872 );
873 }
874
875 let source_table = if let Some(ref table_func) = statement.from_function {
877 debug!("QueryEngine: Processing table function...");
879 match table_func {
880 TableFunction::Generator { name, args } => {
881 use crate::sql::generators::GeneratorRegistry;
883
884 let registry = GeneratorRegistry::new();
886
887 if let Some(generator) = registry.get(name) {
888 let mut evaluator = ArithmeticEvaluator::with_date_notation(
890 &table,
891 self.date_notation.clone(),
892 );
893 let dummy_row = 0;
894
895 let mut evaluated_args = Vec::new();
896 for arg in args {
897 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898 }
899
900 generator.generate(evaluated_args)?
902 } else {
903 return Err(anyhow!("Unknown generator function: {}", name));
904 }
905 }
906 }
907 } else if let Some(ref subquery) = statement.from_subquery {
908 debug!("QueryEngine: Processing FROM subquery...");
910 let subquery_result =
911 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913 let materialized = self.materialize_view(subquery_result)?;
916 Arc::new(materialized)
917 } else if let Some(ref table_name) = statement.from_table {
918 if let Some(cte_view) = cte_context.get(table_name) {
920 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921 let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924 if let Some(ref alias) = statement.from_alias {
926 debug!(
927 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928 alias, table_name
929 );
930 for column in materialized.columns_mut() {
931 if let Some(ref qualified_name) = column.qualified_name {
933 if qualified_name.starts_with(&format!("{}.", table_name)) {
934 column.qualified_name =
935 Some(qualified_name.replace(
936 &format!("{}.", table_name),
937 &format!("{}.", alias),
938 ));
939 }
940 }
941 if column.source_table.as_ref() == Some(table_name) {
943 column.source_table = Some(alias.clone());
944 }
945 }
946 }
947
948 Arc::new(materialized)
949 } else {
950 table.clone()
952 }
953 } else {
954 table.clone()
956 };
957
958 if let Some(ref alias) = statement.from_alias {
960 if let Some(ref table_name) = statement.from_table {
961 exec_context.register_alias(alias.clone(), table_name.clone());
962 }
963 }
964
965 let final_table = if !statement.joins.is_empty() {
967 plan.begin_step(
968 StepType::Join,
969 format!("Process {} JOINs", statement.joins.len()),
970 );
971 plan.set_rows_in(source_table.row_count());
972
973 let join_executor = HashJoinExecutor::new(self.case_insensitive);
974 let mut current_table = source_table;
975
976 for (idx, join_clause) in statement.joins.iter().enumerate() {
977 let join_start = Instant::now();
978 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981 plan.add_detail(format!(
982 "Executing {:?} JOIN on {} condition(s)",
983 join_clause.join_type,
984 join_clause.condition.conditions.len()
985 ));
986
987 let right_table = match &join_clause.table {
989 TableSource::Table(name) => {
990 if let Some(cte_view) = cte_context.get(name) {
992 let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994 if let Some(ref alias) = join_clause.alias {
996 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997 for column in materialized.columns_mut() {
998 if let Some(ref qualified_name) = column.qualified_name {
1000 if qualified_name.starts_with(&format!("{}.", name)) {
1001 column.qualified_name = Some(qualified_name.replace(
1002 &format!("{}.", name),
1003 &format!("{}.", alias),
1004 ));
1005 }
1006 }
1007 if column.source_table.as_ref() == Some(name) {
1009 column.source_table = Some(alias.clone());
1010 }
1011 }
1012 }
1013
1014 Arc::new(materialized)
1015 } else {
1016 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019 }
1020 }
1021 TableSource::DerivedTable { query, alias: _ } => {
1022 let subquery_result = self.build_view_with_context(
1024 table.clone(),
1025 *query.clone(),
1026 cte_context,
1027 )?;
1028 let materialized = self.materialize_view(subquery_result)?;
1029 Arc::new(materialized)
1030 }
1031 };
1032
1033 let joined = join_executor.execute_join(
1035 current_table.clone(),
1036 join_clause,
1037 right_table.clone(),
1038 )?;
1039
1040 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041 plan.set_rows_out(joined.row_count());
1042 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043 plan.add_detail(format!(
1044 "Join time: {:.3}ms",
1045 join_start.elapsed().as_secs_f64() * 1000.0
1046 ));
1047 plan.end_step();
1048
1049 current_table = Arc::new(joined);
1050 }
1051
1052 plan.set_rows_out(current_table.row_count());
1053 plan.add_detail(format!(
1054 "Final result after all joins: {} rows",
1055 current_table.row_count()
1056 ));
1057 plan.end_step();
1058 current_table
1059 } else {
1060 source_table
1061 };
1062
1063 self.build_view_internal_with_plan_and_exec(
1065 final_table,
1066 statement,
1067 plan,
1068 Some(exec_context),
1069 )
1070 }
1071
1072 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074 let source = view.source();
1075 let mut result_table = DataTable::new("derived");
1076
1077 let visible_cols = view.visible_column_indices().to_vec();
1079
1080 for col_idx in &visible_cols {
1082 let col = &source.columns[*col_idx];
1083 let new_col = DataColumn {
1084 name: col.name.clone(),
1085 data_type: col.data_type.clone(),
1086 nullable: col.nullable,
1087 unique_values: col.unique_values,
1088 null_count: col.null_count,
1089 metadata: col.metadata.clone(),
1090 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1093 result_table.add_column(new_col);
1094 }
1095
1096 for row_idx in view.visible_row_indices() {
1098 let source_row = &source.rows[*row_idx];
1099 let mut new_row = DataRow { values: Vec::new() };
1100
1101 for col_idx in &visible_cols {
1102 new_row.values.push(source_row.values[*col_idx].clone());
1103 }
1104
1105 result_table.add_row(new_row);
1106 }
1107
1108 Ok(result_table)
1109 }
1110
1111 fn build_view_internal(
1112 &self,
1113 table: Arc<DataTable>,
1114 statement: SelectStatement,
1115 ) -> Result<DataView> {
1116 let mut dummy_plan = ExecutionPlanBuilder::new();
1117 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118 }
1119
1120 fn build_view_internal_with_plan(
1121 &self,
1122 table: Arc<DataTable>,
1123 statement: SelectStatement,
1124 plan: &mut ExecutionPlanBuilder,
1125 ) -> Result<DataView> {
1126 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127 }
1128
1129 fn build_view_internal_with_plan_and_exec(
1130 &self,
1131 table: Arc<DataTable>,
1132 statement: SelectStatement,
1133 plan: &mut ExecutionPlanBuilder,
1134 exec_context: Option<&ExecutionContext>,
1135 ) -> Result<DataView> {
1136 debug!(
1137 "QueryEngine::build_view - select_items: {:?}",
1138 statement.select_items
1139 );
1140 debug!(
1141 "QueryEngine::build_view - where_clause: {:?}",
1142 statement.where_clause
1143 );
1144
1145 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148 if let Some(where_clause) = &statement.where_clause {
1150 let total_rows = table.row_count();
1151 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155 plan.set_rows_in(total_rows);
1156 plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158 for condition in &where_clause.conditions {
1160 plan.add_detail(format!("Condition: {:?}", condition.expr));
1161 }
1162
1163 let filter_start = Instant::now();
1164 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167 let mut evaluator = if let Some(exec_ctx) = exec_context {
1169 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1171 } else {
1172 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1173 };
1174
1175 let mut filtered_rows = Vec::new();
1177 for row_idx in visible_rows {
1178 if row_idx < 3 {
1180 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1181 }
1182
1183 match evaluator.evaluate(where_clause, row_idx) {
1184 Ok(result) => {
1185 if row_idx < 3 {
1186 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1187 }
1188 if result {
1189 filtered_rows.push(row_idx);
1190 }
1191 }
1192 Err(e) => {
1193 if row_idx < 3 {
1194 debug!(
1195 "QueryEngine: WHERE evaluation error for row {}: {}",
1196 row_idx, e
1197 );
1198 }
1199 return Err(e);
1201 }
1202 }
1203 }
1204
1205 let (compilations, cache_hits) = eval_context.get_stats();
1207 if compilations > 0 || cache_hits > 0 {
1208 debug!(
1209 "LIKE pattern cache: {} compilations, {} cache hits",
1210 compilations, cache_hits
1211 );
1212 }
1213 visible_rows = filtered_rows;
1214 let filter_duration = filter_start.elapsed();
1215 info!(
1216 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1217 total_rows,
1218 visible_rows.len(),
1219 filter_duration
1220 );
1221
1222 plan.set_rows_out(visible_rows.len());
1223 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1224 plan.add_detail(format!(
1225 "Filter time: {:.3}ms",
1226 filter_duration.as_secs_f64() * 1000.0
1227 ));
1228 plan.end_step();
1229 }
1230
1231 let mut view = DataView::new(table.clone());
1233 view = view.with_rows(visible_rows);
1234
1235 if let Some(group_by_exprs) = &statement.group_by {
1237 if !group_by_exprs.is_empty() {
1238 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1239
1240 plan.begin_step(
1241 StepType::GroupBy,
1242 format!("GROUP BY {} expressions", group_by_exprs.len()),
1243 );
1244 plan.set_rows_in(view.row_count());
1245 plan.add_detail(format!("Input: {} rows", view.row_count()));
1246 for expr in group_by_exprs {
1247 plan.add_detail(format!("Group by: {:?}", expr));
1248 }
1249
1250 let group_start = Instant::now();
1251 view = self.apply_group_by(
1252 view,
1253 group_by_exprs,
1254 &statement.select_items,
1255 statement.having.as_ref(),
1256 plan,
1257 )?;
1258
1259 plan.set_rows_out(view.row_count());
1260 plan.add_detail(format!("Output: {} groups", view.row_count()));
1261 plan.add_detail(format!(
1262 "Overall time: {:.3}ms",
1263 group_start.elapsed().as_secs_f64() * 1000.0
1264 ));
1265 plan.end_step();
1266 }
1267 } else {
1268 if !statement.select_items.is_empty() {
1270 let has_non_star_items = statement
1272 .select_items
1273 .iter()
1274 .any(|item| !matches!(item, SelectItem::Star));
1275
1276 if has_non_star_items || statement.select_items.len() > 1 {
1280 view = self.apply_select_items(
1281 view,
1282 &statement.select_items,
1283 &statement,
1284 exec_context,
1285 )?;
1286 }
1287 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1289 debug!("QueryEngine: Using legacy columns path");
1290 let source_table = view.source();
1293 let column_indices =
1294 self.resolve_column_indices(source_table, &statement.columns)?;
1295 view = view.with_columns(column_indices);
1296 }
1297 }
1298
1299 if statement.distinct {
1301 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1302 plan.set_rows_in(view.row_count());
1303 plan.add_detail(format!("Input: {} rows", view.row_count()));
1304
1305 let distinct_start = Instant::now();
1306 view = self.apply_distinct(view)?;
1307
1308 plan.set_rows_out(view.row_count());
1309 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1310 plan.add_detail(format!(
1311 "Distinct time: {:.3}ms",
1312 distinct_start.elapsed().as_secs_f64() * 1000.0
1313 ));
1314 plan.end_step();
1315 }
1316
1317 if let Some(order_by_columns) = &statement.order_by {
1319 if !order_by_columns.is_empty() {
1320 plan.begin_step(
1321 StepType::Sort,
1322 format!("ORDER BY {} columns", order_by_columns.len()),
1323 );
1324 plan.set_rows_in(view.row_count());
1325 for col in order_by_columns {
1326 plan.add_detail(format!("{} {:?}", col.column, col.direction));
1327 }
1328
1329 let sort_start = Instant::now();
1330 view =
1331 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1332
1333 plan.add_detail(format!(
1334 "Sort time: {:.3}ms",
1335 sort_start.elapsed().as_secs_f64() * 1000.0
1336 ));
1337 plan.end_step();
1338 }
1339 }
1340
1341 if let Some(limit) = statement.limit {
1343 let offset = statement.offset.unwrap_or(0);
1344 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1345 plan.set_rows_in(view.row_count());
1346 if offset > 0 {
1347 plan.add_detail(format!("OFFSET: {}", offset));
1348 }
1349 view = view.with_limit(limit, offset);
1350 plan.set_rows_out(view.row_count());
1351 plan.add_detail(format!("Output: {} rows", view.row_count()));
1352 plan.end_step();
1353 }
1354
1355 if !statement.set_operations.is_empty() {
1357 plan.begin_step(
1358 StepType::SetOperation,
1359 format!("Process {} set operations", statement.set_operations.len()),
1360 );
1361 plan.set_rows_in(view.row_count());
1362
1363 let mut combined_table = self.materialize_view(view)?;
1365 let first_columns = combined_table.column_names();
1366 let first_column_count = first_columns.len();
1367
1368 let mut needs_deduplication = false;
1370
1371 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1373 let op_start = Instant::now();
1374 plan.begin_step(
1375 StepType::SetOperation,
1376 format!("{:?} operation #{}", operation, idx + 1),
1377 );
1378
1379 let next_view = if let Some(exec_ctx) = exec_context {
1382 self.build_view_internal_with_plan_and_exec(
1383 table.clone(),
1384 *next_statement.clone(),
1385 plan,
1386 Some(exec_ctx),
1387 )?
1388 } else {
1389 self.build_view_internal_with_plan(
1390 table.clone(),
1391 *next_statement.clone(),
1392 plan,
1393 )?
1394 };
1395
1396 let next_table = self.materialize_view(next_view)?;
1398 let next_columns = next_table.column_names();
1399 let next_column_count = next_columns.len();
1400
1401 if first_column_count != next_column_count {
1403 return Err(anyhow!(
1404 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1405 first_column_count,
1406 idx + 2,
1407 next_column_count
1408 ));
1409 }
1410
1411 for (col_idx, (first_col, next_col)) in
1413 first_columns.iter().zip(next_columns.iter()).enumerate()
1414 {
1415 if !first_col.eq_ignore_ascii_case(next_col) {
1416 debug!(
1417 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1418 col_idx + 1,
1419 first_col,
1420 next_col
1421 );
1422 }
1423 }
1424
1425 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1426 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1427
1428 match operation {
1430 SetOperation::UnionAll => {
1431 for row in next_table.rows.iter() {
1433 combined_table.add_row(row.clone());
1434 }
1435 plan.add_detail(format!(
1436 "Result: {} rows (no deduplication)",
1437 combined_table.row_count()
1438 ));
1439 }
1440 SetOperation::Union => {
1441 for row in next_table.rows.iter() {
1443 combined_table.add_row(row.clone());
1444 }
1445 needs_deduplication = true;
1446 plan.add_detail(format!(
1447 "Combined: {} rows (deduplication pending)",
1448 combined_table.row_count()
1449 ));
1450 }
1451 SetOperation::Intersect => {
1452 return Err(anyhow!("INTERSECT is not yet implemented"));
1455 }
1456 SetOperation::Except => {
1457 return Err(anyhow!("EXCEPT is not yet implemented"));
1460 }
1461 }
1462
1463 plan.add_detail(format!(
1464 "Operation time: {:.3}ms",
1465 op_start.elapsed().as_secs_f64() * 1000.0
1466 ));
1467 plan.set_rows_out(combined_table.row_count());
1468 plan.end_step();
1469 }
1470
1471 plan.set_rows_out(combined_table.row_count());
1472 plan.add_detail(format!(
1473 "Combined result: {} rows after {} operations",
1474 combined_table.row_count(),
1475 statement.set_operations.len()
1476 ));
1477 plan.end_step();
1478
1479 view = DataView::new(Arc::new(combined_table));
1481
1482 if needs_deduplication {
1484 plan.begin_step(
1485 StepType::Distinct,
1486 "UNION deduplication - remove duplicate rows".to_string(),
1487 );
1488 plan.set_rows_in(view.row_count());
1489 plan.add_detail(format!("Input: {} rows", view.row_count()));
1490
1491 let distinct_start = Instant::now();
1492 view = self.apply_distinct(view)?;
1493
1494 plan.set_rows_out(view.row_count());
1495 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1496 plan.add_detail(format!(
1497 "Deduplication time: {:.3}ms",
1498 distinct_start.elapsed().as_secs_f64() * 1000.0
1499 ));
1500 plan.end_step();
1501 }
1502 }
1503
1504 Ok(view)
1505 }
1506
1507 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1509 let mut indices = Vec::new();
1510 let table_columns = table.column_names();
1511
1512 for col_name in columns {
1513 let index = table_columns
1514 .iter()
1515 .position(|c| c.eq_ignore_ascii_case(col_name))
1516 .ok_or_else(|| {
1517 let suggestion = self.find_similar_column(table, col_name);
1518 match suggestion {
1519 Some(similar) => anyhow::anyhow!(
1520 "Column '{}' not found. Did you mean '{}'?",
1521 col_name,
1522 similar
1523 ),
1524 None => anyhow::anyhow!("Column '{}' not found", col_name),
1525 }
1526 })?;
1527 indices.push(index);
1528 }
1529
1530 Ok(indices)
1531 }
1532
1533 fn apply_select_items(
1535 &self,
1536 view: DataView,
1537 select_items: &[SelectItem],
1538 _statement: &SelectStatement,
1539 exec_context: Option<&ExecutionContext>,
1540 ) -> Result<DataView> {
1541 debug!(
1542 "QueryEngine::apply_select_items - items: {:?}",
1543 select_items
1544 );
1545 debug!(
1546 "QueryEngine::apply_select_items - input view has {} rows",
1547 view.row_count()
1548 );
1549
1550 let has_unnest = select_items.iter().any(|item| match item {
1552 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1553 _ => false,
1554 });
1555
1556 if has_unnest {
1557 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1558 return self.apply_select_with_row_expansion(view, select_items);
1559 }
1560
1561 let has_aggregates = select_items.iter().any(|item| match item {
1565 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1566 SelectItem::Column(_) => false,
1567 SelectItem::Star => false,
1568 });
1569
1570 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1571 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1572 SelectItem::Column(_) => false, SelectItem::Star => false, });
1575
1576 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1577 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1580 return self.apply_aggregate_select(view, select_items);
1581 }
1582
1583 let has_computed_expressions = select_items
1585 .iter()
1586 .any(|item| matches!(item, SelectItem::Expression { .. }));
1587
1588 debug!(
1589 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1590 has_computed_expressions
1591 );
1592
1593 if !has_computed_expressions {
1594 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1596 return Ok(view.with_columns(column_indices));
1597 }
1598
1599 let source_table = view.source();
1604 let visible_rows = view.visible_row_indices();
1605
1606 let mut computed_table = DataTable::new("query_result");
1609
1610 let mut expanded_items = Vec::new();
1612 for item in select_items {
1613 match item {
1614 SelectItem::Star => {
1615 for col_name in source_table.column_names() {
1617 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1618 col_name.to_string(),
1619 )));
1620 }
1621 }
1622 _ => expanded_items.push(item.clone()),
1623 }
1624 }
1625
1626 let mut column_name_counts: std::collections::HashMap<String, usize> =
1628 std::collections::HashMap::new();
1629
1630 for item in &expanded_items {
1631 let base_name = match item {
1632 SelectItem::Column(col_ref) => col_ref.name.clone(),
1633 SelectItem::Expression { alias, .. } => alias.clone(),
1634 SelectItem::Star => unreachable!("Star should have been expanded"),
1635 };
1636
1637 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1639 let column_name = if *count == 0 {
1640 base_name.clone()
1642 } else {
1643 format!("{base_name}_{count}")
1645 };
1646 *count += 1;
1647
1648 computed_table.add_column(DataColumn::new(&column_name));
1649 }
1650
1651 let mut evaluator =
1653 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1654
1655 if let Some(exec_ctx) = exec_context {
1657 let aliases = exec_ctx.get_aliases();
1658 if !aliases.is_empty() {
1659 debug!(
1660 "Applying {} aliases to evaluator: {:?}",
1661 aliases.len(),
1662 aliases
1663 );
1664 evaluator = evaluator.with_table_aliases(aliases);
1665 }
1666 }
1667
1668 for &row_idx in visible_rows {
1669 let mut row_values = Vec::new();
1670
1671 for item in &expanded_items {
1672 let value = match item {
1673 SelectItem::Column(col_ref) => {
1674 match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1676 Ok(val) => val,
1677 Err(e) => {
1678 return Err(anyhow!(
1679 "Failed to evaluate column {}: {}",
1680 col_ref.to_sql(),
1681 e
1682 ));
1683 }
1684 }
1685 }
1686 SelectItem::Expression { expr, .. } => {
1687 evaluator.evaluate(expr, row_idx)?
1689 }
1690 SelectItem::Star => unreachable!("Star should have been expanded"),
1691 };
1692 row_values.push(value);
1693 }
1694
1695 computed_table
1696 .add_row(DataRow::new(row_values))
1697 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1698 }
1699
1700 Ok(DataView::new(Arc::new(computed_table)))
1703 }
1704
1705 fn apply_select_with_row_expansion(
1707 &self,
1708 view: DataView,
1709 select_items: &[SelectItem],
1710 ) -> Result<DataView> {
1711 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1712
1713 let source_table = view.source();
1714 let visible_rows = view.visible_row_indices();
1715 let expander_registry = RowExpanderRegistry::new();
1716
1717 let mut result_table = DataTable::new("unnest_result");
1719
1720 let mut expanded_items = Vec::new();
1722 for item in select_items {
1723 match item {
1724 SelectItem::Star => {
1725 for col_name in source_table.column_names() {
1726 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1727 col_name.to_string(),
1728 )));
1729 }
1730 }
1731 _ => expanded_items.push(item.clone()),
1732 }
1733 }
1734
1735 for item in &expanded_items {
1737 let column_name = match item {
1738 SelectItem::Column(col_ref) => col_ref.name.clone(),
1739 SelectItem::Expression { alias, .. } => alias.clone(),
1740 SelectItem::Star => unreachable!("Star should have been expanded"),
1741 };
1742 result_table.add_column(DataColumn::new(&column_name));
1743 }
1744
1745 let mut evaluator =
1747 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1748
1749 for &row_idx in visible_rows {
1750 let mut unnest_expansions = Vec::new();
1752 let mut unnest_indices = Vec::new();
1753
1754 for (col_idx, item) in expanded_items.iter().enumerate() {
1755 if let SelectItem::Expression { expr, .. } = item {
1756 if let Some(expansion_result) = self.try_expand_unnest(
1757 expr,
1758 source_table,
1759 row_idx,
1760 &mut evaluator,
1761 &expander_registry,
1762 )? {
1763 unnest_expansions.push(expansion_result);
1764 unnest_indices.push(col_idx);
1765 }
1766 }
1767 }
1768
1769 let expansion_count = if unnest_expansions.is_empty() {
1771 1 } else {
1773 unnest_expansions
1774 .iter()
1775 .map(|exp| exp.row_count())
1776 .max()
1777 .unwrap_or(1)
1778 };
1779
1780 for output_idx in 0..expansion_count {
1782 let mut row_values = Vec::new();
1783
1784 for (col_idx, item) in expanded_items.iter().enumerate() {
1785 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1787
1788 let value = if let Some(unnest_idx) = unnest_position {
1789 let expansion = &unnest_expansions[unnest_idx];
1791 expansion
1792 .values
1793 .get(output_idx)
1794 .cloned()
1795 .unwrap_or(DataValue::Null)
1796 } else {
1797 match item {
1799 SelectItem::Column(col_ref) => {
1800 let col_idx =
1801 source_table.get_column_index(&col_ref.name).ok_or_else(
1802 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1803 )?;
1804 let row = source_table
1805 .get_row(row_idx)
1806 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1807 row.get(col_idx)
1808 .ok_or_else(|| {
1809 anyhow::anyhow!("Column {} not found in row", col_idx)
1810 })?
1811 .clone()
1812 }
1813 SelectItem::Expression { expr, .. } => {
1814 evaluator.evaluate(expr, row_idx)?
1816 }
1817 SelectItem::Star => unreachable!(),
1818 }
1819 };
1820
1821 row_values.push(value);
1822 }
1823
1824 result_table
1825 .add_row(DataRow::new(row_values))
1826 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1827 }
1828 }
1829
1830 debug!(
1831 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1832 visible_rows.len(),
1833 result_table.row_count()
1834 );
1835
1836 Ok(DataView::new(Arc::new(result_table)))
1837 }
1838
1839 fn try_expand_unnest(
1842 &self,
1843 expr: &SqlExpression,
1844 _source_table: &DataTable,
1845 row_idx: usize,
1846 evaluator: &mut ArithmeticEvaluator,
1847 expander_registry: &RowExpanderRegistry,
1848 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1849 if let SqlExpression::Unnest { column, delimiter } = expr {
1851 let column_value = evaluator.evaluate(column, row_idx)?;
1853
1854 let delimiter_value = DataValue::String(delimiter.clone());
1856
1857 let expander = expander_registry
1859 .get("UNNEST")
1860 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1861
1862 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1864 return Ok(Some(expansion));
1865 }
1866
1867 if let SqlExpression::FunctionCall { name, args, .. } = expr {
1869 if name.to_uppercase() == "UNNEST" {
1870 if args.len() != 2 {
1872 return Err(anyhow::anyhow!(
1873 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1874 ));
1875 }
1876
1877 let column_value = evaluator.evaluate(&args[0], row_idx)?;
1879
1880 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1882
1883 let expander = expander_registry
1885 .get("UNNEST")
1886 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1887
1888 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1890 return Ok(Some(expansion));
1891 }
1892 }
1893
1894 Ok(None)
1895 }
1896
1897 fn apply_aggregate_select(
1899 &self,
1900 view: DataView,
1901 select_items: &[SelectItem],
1902 ) -> Result<DataView> {
1903 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1904
1905 let source_table = view.source();
1906 let mut result_table = DataTable::new("aggregate_result");
1907
1908 for item in select_items {
1910 let column_name = match item {
1911 SelectItem::Expression { alias, .. } => alias.clone(),
1912 _ => unreachable!("Should only have expressions in aggregate-only query"),
1913 };
1914 result_table.add_column(DataColumn::new(&column_name));
1915 }
1916
1917 let visible_rows = view.visible_row_indices().to_vec();
1919 let mut evaluator =
1920 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1921 .with_visible_rows(visible_rows);
1922
1923 let mut row_values = Vec::new();
1925 for item in select_items {
1926 match item {
1927 SelectItem::Expression { expr, .. } => {
1928 let value = evaluator.evaluate(expr, 0)?;
1931 row_values.push(value);
1932 }
1933 _ => unreachable!("Should only have expressions in aggregate-only query"),
1934 }
1935 }
1936
1937 result_table
1939 .add_row(DataRow::new(row_values))
1940 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1941
1942 Ok(DataView::new(Arc::new(result_table)))
1943 }
1944
1945 fn resolve_select_columns(
1947 &self,
1948 table: &DataTable,
1949 select_items: &[SelectItem],
1950 ) -> Result<Vec<usize>> {
1951 let mut indices = Vec::new();
1952 let table_columns = table.column_names();
1953
1954 for item in select_items {
1955 match item {
1956 SelectItem::Column(col_ref) => {
1957 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1959 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1961 table.find_column_by_qualified_name(&qualified_name)
1962 .ok_or_else(|| {
1963 let has_qualified = table.columns.iter()
1965 .any(|c| c.qualified_name.is_some());
1966 if !has_qualified {
1967 anyhow::anyhow!(
1968 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1969 qualified_name, table_prefix
1970 )
1971 } else {
1972 anyhow::anyhow!("Column '{}' not found", qualified_name)
1973 }
1974 })?
1975 } else {
1976 table_columns
1978 .iter()
1979 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1980 .ok_or_else(|| {
1981 let suggestion = self.find_similar_column(table, &col_ref.name);
1982 match suggestion {
1983 Some(similar) => anyhow::anyhow!(
1984 "Column '{}' not found. Did you mean '{}'?",
1985 col_ref.name,
1986 similar
1987 ),
1988 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1989 }
1990 })?
1991 };
1992 indices.push(index);
1993 }
1994 SelectItem::Star => {
1995 for i in 0..table_columns.len() {
1997 indices.push(i);
1998 }
1999 }
2000 SelectItem::Expression { .. } => {
2001 return Err(anyhow::anyhow!(
2002 "Computed expressions require new table creation"
2003 ));
2004 }
2005 }
2006 }
2007
2008 Ok(indices)
2009 }
2010
2011 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2013 use std::collections::HashSet;
2014
2015 let source = view.source();
2016 let visible_cols = view.visible_column_indices();
2017 let visible_rows = view.visible_row_indices();
2018
2019 let mut seen_rows = HashSet::new();
2021 let mut unique_row_indices = Vec::new();
2022
2023 for &row_idx in visible_rows {
2024 let mut row_key = Vec::new();
2026 for &col_idx in visible_cols {
2027 let value = source
2028 .get_value(row_idx, col_idx)
2029 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2030 row_key.push(format!("{:?}", value));
2032 }
2033
2034 if seen_rows.insert(row_key) {
2036 unique_row_indices.push(row_idx);
2038 }
2039 }
2040
2041 Ok(view.with_rows(unique_row_indices))
2043 }
2044
2045 fn apply_multi_order_by(
2047 &self,
2048 view: DataView,
2049 order_by_columns: &[OrderByColumn],
2050 ) -> Result<DataView> {
2051 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2052 }
2053
2054 fn apply_multi_order_by_with_context(
2056 &self,
2057 mut view: DataView,
2058 order_by_columns: &[OrderByColumn],
2059 _exec_context: Option<&ExecutionContext>,
2060 ) -> Result<DataView> {
2061 let mut sort_columns = Vec::new();
2063
2064 for order_col in order_by_columns {
2065 let col_index = if order_col.column.contains('.') {
2067 if let Some(dot_pos) = order_col.column.rfind('.') {
2069 let col_name = &order_col.column[dot_pos + 1..];
2070
2071 debug!(
2074 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2075 col_name, order_col.column
2076 );
2077 view.source().get_column_index(col_name)
2078 } else {
2079 view.source().get_column_index(&order_col.column)
2080 }
2081 } else {
2082 view.source().get_column_index(&order_col.column)
2084 }
2085 .ok_or_else(|| {
2086 let suggestion = self.find_similar_column(view.source(), &order_col.column);
2088 match suggestion {
2089 Some(similar) => anyhow::anyhow!(
2090 "Column '{}' not found. Did you mean '{}'?",
2091 order_col.column,
2092 similar
2093 ),
2094 None => {
2095 let available_cols = view.source().column_names().join(", ");
2097 anyhow::anyhow!(
2098 "Column '{}' not found. Available columns: {}",
2099 order_col.column,
2100 available_cols
2101 )
2102 }
2103 }
2104 })?;
2105
2106 let ascending = matches!(order_col.direction, SortDirection::Asc);
2107 sort_columns.push((col_index, ascending));
2108 }
2109
2110 view.apply_multi_sort(&sort_columns)?;
2112 Ok(view)
2113 }
2114
2115 fn apply_group_by(
2117 &self,
2118 view: DataView,
2119 group_by_exprs: &[SqlExpression],
2120 select_items: &[SelectItem],
2121 having: Option<&SqlExpression>,
2122 plan: &mut ExecutionPlanBuilder,
2123 ) -> Result<DataView> {
2124 let (result_view, phase_info) = self.apply_group_by_expressions(
2126 view,
2127 group_by_exprs,
2128 select_items,
2129 having,
2130 self.case_insensitive,
2131 self.date_notation.clone(),
2132 )?;
2133
2134 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2136 plan.add_detail(format!(
2137 "Phase 1 - Group Building: {:.3}ms",
2138 phase_info.phase2_key_building.as_secs_f64() * 1000.0
2139 ));
2140 plan.add_detail(format!(
2141 " • Processing {} rows into {} groups",
2142 phase_info.total_rows, phase_info.num_groups
2143 ));
2144 plan.add_detail(format!(
2145 "Phase 2 - Aggregation: {:.3}ms",
2146 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2147 ));
2148 if phase_info.phase4_having_evaluation > Duration::ZERO {
2149 plan.add_detail(format!(
2150 "Phase 3 - HAVING Filter: {:.3}ms",
2151 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2152 ));
2153 plan.add_detail(format!(
2154 " • Filtered {} groups",
2155 phase_info.groups_filtered_by_having
2156 ));
2157 }
2158 plan.add_detail(format!(
2159 "Total GROUP BY time: {:.3}ms",
2160 phase_info.total_time.as_secs_f64() * 1000.0
2161 ));
2162
2163 Ok(result_view)
2164 }
2165
2166 pub fn estimate_group_cardinality(
2169 &self,
2170 view: &DataView,
2171 group_by_exprs: &[SqlExpression],
2172 ) -> usize {
2173 let row_count = view.get_visible_rows().len();
2175 if row_count <= 100 {
2176 return row_count;
2177 }
2178
2179 let sample_size = min(1000, row_count / 10).max(100);
2181 let mut seen = FxHashSet::default();
2182
2183 let visible_rows = view.get_visible_rows();
2184 for (i, &row_idx) in visible_rows.iter().enumerate() {
2185 if i >= sample_size {
2186 break;
2187 }
2188
2189 let mut key_values = Vec::new();
2191 for expr in group_by_exprs {
2192 let mut evaluator = ArithmeticEvaluator::new(view.source());
2193 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2194 key_values.push(value);
2195 }
2196
2197 seen.insert(key_values);
2198 }
2199
2200 let sample_cardinality = seen.len();
2202 let estimated = (sample_cardinality * row_count) / sample_size;
2203
2204 estimated.min(row_count).max(sample_cardinality)
2206 }
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211 use super::*;
2212 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2213
2214 fn create_test_table() -> Arc<DataTable> {
2215 let mut table = DataTable::new("test");
2216
2217 table.add_column(DataColumn::new("id"));
2219 table.add_column(DataColumn::new("name"));
2220 table.add_column(DataColumn::new("age"));
2221
2222 table
2224 .add_row(DataRow::new(vec![
2225 DataValue::Integer(1),
2226 DataValue::String("Alice".to_string()),
2227 DataValue::Integer(30),
2228 ]))
2229 .unwrap();
2230
2231 table
2232 .add_row(DataRow::new(vec![
2233 DataValue::Integer(2),
2234 DataValue::String("Bob".to_string()),
2235 DataValue::Integer(25),
2236 ]))
2237 .unwrap();
2238
2239 table
2240 .add_row(DataRow::new(vec![
2241 DataValue::Integer(3),
2242 DataValue::String("Charlie".to_string()),
2243 DataValue::Integer(35),
2244 ]))
2245 .unwrap();
2246
2247 Arc::new(table)
2248 }
2249
2250 #[test]
2251 fn test_select_all() {
2252 let table = create_test_table();
2253 let engine = QueryEngine::new();
2254
2255 let view = engine
2256 .execute(table.clone(), "SELECT * FROM users")
2257 .unwrap();
2258 assert_eq!(view.row_count(), 3);
2259 assert_eq!(view.column_count(), 3);
2260 }
2261
2262 #[test]
2263 fn test_select_columns() {
2264 let table = create_test_table();
2265 let engine = QueryEngine::new();
2266
2267 let view = engine
2268 .execute(table.clone(), "SELECT name, age FROM users")
2269 .unwrap();
2270 assert_eq!(view.row_count(), 3);
2271 assert_eq!(view.column_count(), 2);
2272 }
2273
2274 #[test]
2275 fn test_select_with_limit() {
2276 let table = create_test_table();
2277 let engine = QueryEngine::new();
2278
2279 let view = engine
2280 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2281 .unwrap();
2282 assert_eq!(view.row_count(), 2);
2283 }
2284
2285 #[test]
2286 fn test_type_coercion_contains() {
2287 let _ = tracing_subscriber::fmt()
2289 .with_max_level(tracing::Level::DEBUG)
2290 .try_init();
2291
2292 let mut table = DataTable::new("test");
2293 table.add_column(DataColumn::new("id"));
2294 table.add_column(DataColumn::new("status"));
2295 table.add_column(DataColumn::new("price"));
2296
2297 table
2299 .add_row(DataRow::new(vec![
2300 DataValue::Integer(1),
2301 DataValue::String("Pending".to_string()),
2302 DataValue::Float(99.99),
2303 ]))
2304 .unwrap();
2305
2306 table
2307 .add_row(DataRow::new(vec![
2308 DataValue::Integer(2),
2309 DataValue::String("Confirmed".to_string()),
2310 DataValue::Float(150.50),
2311 ]))
2312 .unwrap();
2313
2314 table
2315 .add_row(DataRow::new(vec![
2316 DataValue::Integer(3),
2317 DataValue::String("Pending".to_string()),
2318 DataValue::Float(75.00),
2319 ]))
2320 .unwrap();
2321
2322 let table = Arc::new(table);
2323 let engine = QueryEngine::new();
2324
2325 println!("\n=== Testing WHERE clause with Contains ===");
2326 println!("Table has {} rows", table.row_count());
2327 for i in 0..table.row_count() {
2328 let status = table.get_value(i, 1);
2329 println!("Row {i}: status = {status:?}");
2330 }
2331
2332 println!("\n--- Test 1: status.Contains('pend') ---");
2334 let result = engine.execute(
2335 table.clone(),
2336 "SELECT * FROM test WHERE status.Contains('pend')",
2337 );
2338 match result {
2339 Ok(view) => {
2340 println!("SUCCESS: Found {} matching rows", view.row_count());
2341 assert_eq!(view.row_count(), 2); }
2343 Err(e) => {
2344 panic!("Query failed: {e}");
2345 }
2346 }
2347
2348 println!("\n--- Test 2: price.Contains('9') ---");
2350 let result = engine.execute(
2351 table.clone(),
2352 "SELECT * FROM test WHERE price.Contains('9')",
2353 );
2354 match result {
2355 Ok(view) => {
2356 println!(
2357 "SUCCESS: Found {} matching rows with price containing '9'",
2358 view.row_count()
2359 );
2360 assert!(view.row_count() >= 1);
2362 }
2363 Err(e) => {
2364 panic!("Numeric coercion query failed: {e}");
2365 }
2366 }
2367
2368 println!("\n=== All tests passed! ===");
2369 }
2370
2371 #[test]
2372 fn test_not_in_clause() {
2373 let _ = tracing_subscriber::fmt()
2375 .with_max_level(tracing::Level::DEBUG)
2376 .try_init();
2377
2378 let mut table = DataTable::new("test");
2379 table.add_column(DataColumn::new("id"));
2380 table.add_column(DataColumn::new("country"));
2381
2382 table
2384 .add_row(DataRow::new(vec![
2385 DataValue::Integer(1),
2386 DataValue::String("CA".to_string()),
2387 ]))
2388 .unwrap();
2389
2390 table
2391 .add_row(DataRow::new(vec![
2392 DataValue::Integer(2),
2393 DataValue::String("US".to_string()),
2394 ]))
2395 .unwrap();
2396
2397 table
2398 .add_row(DataRow::new(vec![
2399 DataValue::Integer(3),
2400 DataValue::String("UK".to_string()),
2401 ]))
2402 .unwrap();
2403
2404 let table = Arc::new(table);
2405 let engine = QueryEngine::new();
2406
2407 println!("\n=== Testing NOT IN clause ===");
2408 println!("Table has {} rows", table.row_count());
2409 for i in 0..table.row_count() {
2410 let country = table.get_value(i, 1);
2411 println!("Row {i}: country = {country:?}");
2412 }
2413
2414 println!("\n--- Test: country NOT IN ('CA') ---");
2416 let result = engine.execute(
2417 table.clone(),
2418 "SELECT * FROM test WHERE country NOT IN ('CA')",
2419 );
2420 match result {
2421 Ok(view) => {
2422 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2423 assert_eq!(view.row_count(), 2); }
2425 Err(e) => {
2426 panic!("NOT IN query failed: {e}");
2427 }
2428 }
2429
2430 println!("\n=== NOT IN test complete! ===");
2431 }
2432
2433 #[test]
2434 fn test_case_insensitive_in_and_not_in() {
2435 let _ = tracing_subscriber::fmt()
2437 .with_max_level(tracing::Level::DEBUG)
2438 .try_init();
2439
2440 let mut table = DataTable::new("test");
2441 table.add_column(DataColumn::new("id"));
2442 table.add_column(DataColumn::new("country"));
2443
2444 table
2446 .add_row(DataRow::new(vec![
2447 DataValue::Integer(1),
2448 DataValue::String("CA".to_string()), ]))
2450 .unwrap();
2451
2452 table
2453 .add_row(DataRow::new(vec![
2454 DataValue::Integer(2),
2455 DataValue::String("us".to_string()), ]))
2457 .unwrap();
2458
2459 table
2460 .add_row(DataRow::new(vec![
2461 DataValue::Integer(3),
2462 DataValue::String("UK".to_string()), ]))
2464 .unwrap();
2465
2466 let table = Arc::new(table);
2467
2468 println!("\n=== Testing Case-Insensitive IN clause ===");
2469 println!("Table has {} rows", table.row_count());
2470 for i in 0..table.row_count() {
2471 let country = table.get_value(i, 1);
2472 println!("Row {i}: country = {country:?}");
2473 }
2474
2475 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2477 let engine = QueryEngine::with_case_insensitive(true);
2478 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2479 match result {
2480 Ok(view) => {
2481 println!(
2482 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2483 view.row_count()
2484 );
2485 assert_eq!(view.row_count(), 1); }
2487 Err(e) => {
2488 panic!("Case-insensitive IN query failed: {e}");
2489 }
2490 }
2491
2492 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2494 let result = engine.execute(
2495 table.clone(),
2496 "SELECT * FROM test WHERE country NOT IN ('ca')",
2497 );
2498 match result {
2499 Ok(view) => {
2500 println!(
2501 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2502 view.row_count()
2503 );
2504 assert_eq!(view.row_count(), 2); }
2506 Err(e) => {
2507 panic!("Case-insensitive NOT IN query failed: {e}");
2508 }
2509 }
2510
2511 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2513 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
2515 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2516 match result {
2517 Ok(view) => {
2518 println!(
2519 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2520 view.row_count()
2521 );
2522 assert_eq!(view.row_count(), 0); }
2524 Err(e) => {
2525 panic!("Case-sensitive IN query failed: {e}");
2526 }
2527 }
2528
2529 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2530 }
2531
2532 #[test]
2533 #[ignore = "Parentheses in WHERE clause not yet implemented"]
2534 fn test_parentheses_in_where_clause() {
2535 let _ = tracing_subscriber::fmt()
2537 .with_max_level(tracing::Level::DEBUG)
2538 .try_init();
2539
2540 let mut table = DataTable::new("test");
2541 table.add_column(DataColumn::new("id"));
2542 table.add_column(DataColumn::new("status"));
2543 table.add_column(DataColumn::new("priority"));
2544
2545 table
2547 .add_row(DataRow::new(vec![
2548 DataValue::Integer(1),
2549 DataValue::String("Pending".to_string()),
2550 DataValue::String("High".to_string()),
2551 ]))
2552 .unwrap();
2553
2554 table
2555 .add_row(DataRow::new(vec![
2556 DataValue::Integer(2),
2557 DataValue::String("Complete".to_string()),
2558 DataValue::String("High".to_string()),
2559 ]))
2560 .unwrap();
2561
2562 table
2563 .add_row(DataRow::new(vec![
2564 DataValue::Integer(3),
2565 DataValue::String("Pending".to_string()),
2566 DataValue::String("Low".to_string()),
2567 ]))
2568 .unwrap();
2569
2570 table
2571 .add_row(DataRow::new(vec![
2572 DataValue::Integer(4),
2573 DataValue::String("Complete".to_string()),
2574 DataValue::String("Low".to_string()),
2575 ]))
2576 .unwrap();
2577
2578 let table = Arc::new(table);
2579 let engine = QueryEngine::new();
2580
2581 println!("\n=== Testing Parentheses in WHERE clause ===");
2582 println!("Table has {} rows", table.row_count());
2583 for i in 0..table.row_count() {
2584 let status = table.get_value(i, 1);
2585 let priority = table.get_value(i, 2);
2586 println!("Row {i}: status = {status:?}, priority = {priority:?}");
2587 }
2588
2589 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2591 let result = engine.execute(
2592 table.clone(),
2593 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2594 );
2595 match result {
2596 Ok(view) => {
2597 println!(
2598 "SUCCESS: Found {} rows with parenthetical logic",
2599 view.row_count()
2600 );
2601 assert_eq!(view.row_count(), 2); }
2603 Err(e) => {
2604 panic!("Parentheses query failed: {e}");
2605 }
2606 }
2607
2608 println!("\n=== Parentheses test complete! ===");
2609 }
2610
2611 #[test]
2612 #[ignore = "Numeric type coercion needs fixing"]
2613 fn test_numeric_type_coercion() {
2614 let _ = tracing_subscriber::fmt()
2616 .with_max_level(tracing::Level::DEBUG)
2617 .try_init();
2618
2619 let mut table = DataTable::new("test");
2620 table.add_column(DataColumn::new("id"));
2621 table.add_column(DataColumn::new("price"));
2622 table.add_column(DataColumn::new("quantity"));
2623
2624 table
2626 .add_row(DataRow::new(vec![
2627 DataValue::Integer(1),
2628 DataValue::Float(99.50), DataValue::Integer(100),
2630 ]))
2631 .unwrap();
2632
2633 table
2634 .add_row(DataRow::new(vec![
2635 DataValue::Integer(2),
2636 DataValue::Float(150.0), DataValue::Integer(200),
2638 ]))
2639 .unwrap();
2640
2641 table
2642 .add_row(DataRow::new(vec![
2643 DataValue::Integer(3),
2644 DataValue::Integer(75), DataValue::Integer(50),
2646 ]))
2647 .unwrap();
2648
2649 let table = Arc::new(table);
2650 let engine = QueryEngine::new();
2651
2652 println!("\n=== Testing Numeric Type Coercion ===");
2653 println!("Table has {} rows", table.row_count());
2654 for i in 0..table.row_count() {
2655 let price = table.get_value(i, 1);
2656 let quantity = table.get_value(i, 2);
2657 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2658 }
2659
2660 println!("\n--- Test: price.Contains('.') ---");
2662 let result = engine.execute(
2663 table.clone(),
2664 "SELECT * FROM test WHERE price.Contains('.')",
2665 );
2666 match result {
2667 Ok(view) => {
2668 println!(
2669 "SUCCESS: Found {} rows with decimal points in price",
2670 view.row_count()
2671 );
2672 assert_eq!(view.row_count(), 2); }
2674 Err(e) => {
2675 panic!("Numeric Contains query failed: {e}");
2676 }
2677 }
2678
2679 println!("\n--- Test: quantity.Contains('0') ---");
2681 let result = engine.execute(
2682 table.clone(),
2683 "SELECT * FROM test WHERE quantity.Contains('0')",
2684 );
2685 match result {
2686 Ok(view) => {
2687 println!(
2688 "SUCCESS: Found {} rows with '0' in quantity",
2689 view.row_count()
2690 );
2691 assert_eq!(view.row_count(), 2); }
2693 Err(e) => {
2694 panic!("Integer Contains query failed: {e}");
2695 }
2696 }
2697
2698 println!("\n=== Numeric type coercion test complete! ===");
2699 }
2700
2701 #[test]
2702 fn test_datetime_comparisons() {
2703 let _ = tracing_subscriber::fmt()
2705 .with_max_level(tracing::Level::DEBUG)
2706 .try_init();
2707
2708 let mut table = DataTable::new("test");
2709 table.add_column(DataColumn::new("id"));
2710 table.add_column(DataColumn::new("created_date"));
2711
2712 table
2714 .add_row(DataRow::new(vec![
2715 DataValue::Integer(1),
2716 DataValue::String("2024-12-15".to_string()),
2717 ]))
2718 .unwrap();
2719
2720 table
2721 .add_row(DataRow::new(vec![
2722 DataValue::Integer(2),
2723 DataValue::String("2025-01-15".to_string()),
2724 ]))
2725 .unwrap();
2726
2727 table
2728 .add_row(DataRow::new(vec![
2729 DataValue::Integer(3),
2730 DataValue::String("2025-02-15".to_string()),
2731 ]))
2732 .unwrap();
2733
2734 let table = Arc::new(table);
2735 let engine = QueryEngine::new();
2736
2737 println!("\n=== Testing DateTime Comparisons ===");
2738 println!("Table has {} rows", table.row_count());
2739 for i in 0..table.row_count() {
2740 let date = table.get_value(i, 1);
2741 println!("Row {i}: created_date = {date:?}");
2742 }
2743
2744 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2746 let result = engine.execute(
2747 table.clone(),
2748 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2749 );
2750 match result {
2751 Ok(view) => {
2752 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2753 assert_eq!(view.row_count(), 2); }
2755 Err(e) => {
2756 panic!("DateTime comparison query failed: {e}");
2757 }
2758 }
2759
2760 println!("\n=== DateTime comparison test complete! ===");
2761 }
2762
2763 #[test]
2764 fn test_not_with_method_calls() {
2765 let _ = tracing_subscriber::fmt()
2767 .with_max_level(tracing::Level::DEBUG)
2768 .try_init();
2769
2770 let mut table = DataTable::new("test");
2771 table.add_column(DataColumn::new("id"));
2772 table.add_column(DataColumn::new("status"));
2773
2774 table
2776 .add_row(DataRow::new(vec![
2777 DataValue::Integer(1),
2778 DataValue::String("Pending Review".to_string()),
2779 ]))
2780 .unwrap();
2781
2782 table
2783 .add_row(DataRow::new(vec![
2784 DataValue::Integer(2),
2785 DataValue::String("Complete".to_string()),
2786 ]))
2787 .unwrap();
2788
2789 table
2790 .add_row(DataRow::new(vec![
2791 DataValue::Integer(3),
2792 DataValue::String("Pending Approval".to_string()),
2793 ]))
2794 .unwrap();
2795
2796 let table = Arc::new(table);
2797 let engine = QueryEngine::with_case_insensitive(true);
2798
2799 println!("\n=== Testing NOT with Method Calls ===");
2800 println!("Table has {} rows", table.row_count());
2801 for i in 0..table.row_count() {
2802 let status = table.get_value(i, 1);
2803 println!("Row {i}: status = {status:?}");
2804 }
2805
2806 println!("\n--- Test: NOT status.Contains('pend') ---");
2808 let result = engine.execute(
2809 table.clone(),
2810 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2811 );
2812 match result {
2813 Ok(view) => {
2814 println!(
2815 "SUCCESS: Found {} rows NOT containing 'pend'",
2816 view.row_count()
2817 );
2818 assert_eq!(view.row_count(), 1); }
2820 Err(e) => {
2821 panic!("NOT Contains query failed: {e}");
2822 }
2823 }
2824
2825 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2827 let result = engine.execute(
2828 table.clone(),
2829 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2830 );
2831 match result {
2832 Ok(view) => {
2833 println!(
2834 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2835 view.row_count()
2836 );
2837 assert_eq!(view.row_count(), 1); }
2839 Err(e) => {
2840 panic!("NOT StartsWith query failed: {e}");
2841 }
2842 }
2843
2844 println!("\n=== NOT with method calls test complete! ===");
2845 }
2846
2847 #[test]
2848 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2849 fn test_complex_logical_expressions() {
2850 let _ = tracing_subscriber::fmt()
2852 .with_max_level(tracing::Level::DEBUG)
2853 .try_init();
2854
2855 let mut table = DataTable::new("test");
2856 table.add_column(DataColumn::new("id"));
2857 table.add_column(DataColumn::new("status"));
2858 table.add_column(DataColumn::new("priority"));
2859 table.add_column(DataColumn::new("assigned"));
2860
2861 table
2863 .add_row(DataRow::new(vec![
2864 DataValue::Integer(1),
2865 DataValue::String("Pending".to_string()),
2866 DataValue::String("High".to_string()),
2867 DataValue::String("John".to_string()),
2868 ]))
2869 .unwrap();
2870
2871 table
2872 .add_row(DataRow::new(vec![
2873 DataValue::Integer(2),
2874 DataValue::String("Complete".to_string()),
2875 DataValue::String("High".to_string()),
2876 DataValue::String("Jane".to_string()),
2877 ]))
2878 .unwrap();
2879
2880 table
2881 .add_row(DataRow::new(vec![
2882 DataValue::Integer(3),
2883 DataValue::String("Pending".to_string()),
2884 DataValue::String("Low".to_string()),
2885 DataValue::String("John".to_string()),
2886 ]))
2887 .unwrap();
2888
2889 table
2890 .add_row(DataRow::new(vec![
2891 DataValue::Integer(4),
2892 DataValue::String("In Progress".to_string()),
2893 DataValue::String("Medium".to_string()),
2894 DataValue::String("Jane".to_string()),
2895 ]))
2896 .unwrap();
2897
2898 let table = Arc::new(table);
2899 let engine = QueryEngine::new();
2900
2901 println!("\n=== Testing Complex Logical Expressions ===");
2902 println!("Table has {} rows", table.row_count());
2903 for i in 0..table.row_count() {
2904 let status = table.get_value(i, 1);
2905 let priority = table.get_value(i, 2);
2906 let assigned = table.get_value(i, 3);
2907 println!(
2908 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2909 );
2910 }
2911
2912 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2914 let result = engine.execute(
2915 table.clone(),
2916 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2917 );
2918 match result {
2919 Ok(view) => {
2920 println!(
2921 "SUCCESS: Found {} rows with complex logic",
2922 view.row_count()
2923 );
2924 assert_eq!(view.row_count(), 2); }
2926 Err(e) => {
2927 panic!("Complex logic query failed: {e}");
2928 }
2929 }
2930
2931 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2933 let result = engine.execute(
2934 table.clone(),
2935 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2936 );
2937 match result {
2938 Ok(view) => {
2939 println!(
2940 "SUCCESS: Found {} rows with NOT complex logic",
2941 view.row_count()
2942 );
2943 assert_eq!(view.row_count(), 2); }
2945 Err(e) => {
2946 panic!("NOT complex logic query failed: {e}");
2947 }
2948 }
2949
2950 println!("\n=== Complex logical expressions test complete! ===");
2951 }
2952
2953 #[test]
2954 fn test_mixed_data_types_and_edge_cases() {
2955 let _ = tracing_subscriber::fmt()
2957 .with_max_level(tracing::Level::DEBUG)
2958 .try_init();
2959
2960 let mut table = DataTable::new("test");
2961 table.add_column(DataColumn::new("id"));
2962 table.add_column(DataColumn::new("value"));
2963 table.add_column(DataColumn::new("nullable_field"));
2964
2965 table
2967 .add_row(DataRow::new(vec![
2968 DataValue::Integer(1),
2969 DataValue::String("123.45".to_string()),
2970 DataValue::String("present".to_string()),
2971 ]))
2972 .unwrap();
2973
2974 table
2975 .add_row(DataRow::new(vec![
2976 DataValue::Integer(2),
2977 DataValue::Float(678.90),
2978 DataValue::Null,
2979 ]))
2980 .unwrap();
2981
2982 table
2983 .add_row(DataRow::new(vec![
2984 DataValue::Integer(3),
2985 DataValue::Boolean(true),
2986 DataValue::String("also present".to_string()),
2987 ]))
2988 .unwrap();
2989
2990 table
2991 .add_row(DataRow::new(vec![
2992 DataValue::Integer(4),
2993 DataValue::String("false".to_string()),
2994 DataValue::Null,
2995 ]))
2996 .unwrap();
2997
2998 let table = Arc::new(table);
2999 let engine = QueryEngine::new();
3000
3001 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3002 println!("Table has {} rows", table.row_count());
3003 for i in 0..table.row_count() {
3004 let value = table.get_value(i, 1);
3005 let nullable = table.get_value(i, 2);
3006 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3007 }
3008
3009 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3011 let result = engine.execute(
3012 table.clone(),
3013 "SELECT * FROM test WHERE value.Contains('true')",
3014 );
3015 match result {
3016 Ok(view) => {
3017 println!(
3018 "SUCCESS: Found {} rows with boolean coercion",
3019 view.row_count()
3020 );
3021 assert_eq!(view.row_count(), 1); }
3023 Err(e) => {
3024 panic!("Boolean coercion query failed: {e}");
3025 }
3026 }
3027
3028 println!("\n--- Test: id IN (1, 3) ---");
3030 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3031 match result {
3032 Ok(view) => {
3033 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3034 assert_eq!(view.row_count(), 2); }
3036 Err(e) => {
3037 panic!("Multiple IN values query failed: {e}");
3038 }
3039 }
3040
3041 println!("\n=== Mixed data types test complete! ===");
3042 }
3043
3044 #[test]
3046 fn test_aggregate_only_single_row() {
3047 let table = create_test_stock_data();
3048 let engine = QueryEngine::new();
3049
3050 let result = engine
3052 .execute(
3053 table.clone(),
3054 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3055 )
3056 .expect("Query should succeed");
3057
3058 assert_eq!(
3059 result.row_count(),
3060 1,
3061 "Aggregate-only query should return exactly 1 row"
3062 );
3063 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3064
3065 let source = result.source();
3067 let row = source.get_row(0).expect("Should have first row");
3068
3069 assert_eq!(row.values[0], DataValue::Integer(5));
3071
3072 assert_eq!(row.values[1], DataValue::Float(99.5));
3074
3075 assert_eq!(row.values[2], DataValue::Float(105.0));
3077
3078 if let DataValue::Float(avg) = &row.values[3] {
3080 assert!(
3081 (avg - 102.4).abs() < 0.01,
3082 "Average should be approximately 102.4, got {}",
3083 avg
3084 );
3085 } else {
3086 panic!("AVG should return a Float value");
3087 }
3088 }
3089
3090 #[test]
3092 fn test_single_aggregate_single_row() {
3093 let table = create_test_stock_data();
3094 let engine = QueryEngine::new();
3095
3096 let result = engine
3097 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3098 .expect("Query should succeed");
3099
3100 assert_eq!(
3101 result.row_count(),
3102 1,
3103 "Single aggregate query should return exactly 1 row"
3104 );
3105 assert_eq!(result.column_count(), 1, "Should have 1 column");
3106
3107 let source = result.source();
3108 let row = source.get_row(0).expect("Should have first row");
3109 assert_eq!(row.values[0], DataValue::Integer(5));
3110 }
3111
3112 #[test]
3114 fn test_aggregate_with_where_single_row() {
3115 let table = create_test_stock_data();
3116 let engine = QueryEngine::new();
3117
3118 let result = engine
3120 .execute(
3121 table.clone(),
3122 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3123 )
3124 .expect("Query should succeed");
3125
3126 assert_eq!(
3127 result.row_count(),
3128 1,
3129 "Filtered aggregate query should return exactly 1 row"
3130 );
3131 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3132
3133 let source = result.source();
3134 let row = source.get_row(0).expect("Should have first row");
3135
3136 assert_eq!(row.values[0], DataValue::Integer(2));
3138 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
3141
3142 #[test]
3143 fn test_not_in_parsing() {
3144 use crate::sql::recursive_parser::Parser;
3145
3146 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3147 println!("\n=== Testing NOT IN parsing ===");
3148 println!("Parsing query: {query}");
3149
3150 let mut parser = Parser::new(query);
3151 match parser.parse() {
3152 Ok(statement) => {
3153 println!("Parsed statement: {statement:#?}");
3154 if let Some(where_clause) = statement.where_clause {
3155 println!("WHERE conditions: {:#?}", where_clause.conditions);
3156 if let Some(first_condition) = where_clause.conditions.first() {
3157 println!("First condition expression: {:#?}", first_condition.expr);
3158 }
3159 }
3160 }
3161 Err(e) => {
3162 panic!("Parse error: {e}");
3163 }
3164 }
3165 }
3166
3167 fn create_test_stock_data() -> Arc<DataTable> {
3169 let mut table = DataTable::new("stock");
3170
3171 table.add_column(DataColumn::new("symbol"));
3172 table.add_column(DataColumn::new("close"));
3173 table.add_column(DataColumn::new("volume"));
3174
3175 let test_data = vec![
3177 ("AAPL", 99.5, 1000),
3178 ("AAPL", 101.2, 1500),
3179 ("AAPL", 103.5, 2000),
3180 ("AAPL", 105.0, 1200),
3181 ("AAPL", 102.8, 1800),
3182 ];
3183
3184 for (symbol, close, volume) in test_data {
3185 table
3186 .add_row(DataRow::new(vec![
3187 DataValue::String(symbol.to_string()),
3188 DataValue::Float(close),
3189 DataValue::Integer(volume),
3190 ]))
3191 .expect("Should add row successfully");
3192 }
3193
3194 Arc::new(table)
3195 }
3196}
3197
3198#[cfg(test)]
3199#[path = "query_engine_tests.rs"]
3200mod query_engine_tests;