1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::TableSource;
25use crate::sql::recursive_parser::{
26 CTEType, OrderByColumn, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
27 TableFunction,
28};
29
30#[derive(Debug, Clone)]
32pub struct ExecutionContext {
33 alias_map: HashMap<String, String>,
36}
37
38impl ExecutionContext {
39 pub fn new() -> Self {
41 Self {
42 alias_map: HashMap::new(),
43 }
44 }
45
46 pub fn register_alias(&mut self, alias: String, table_name: String) {
48 debug!("Registering alias: {} -> {}", alias, table_name);
49 self.alias_map.insert(alias, table_name);
50 }
51
52 pub fn resolve_alias(&self, name: &str) -> String {
55 self.alias_map
56 .get(name)
57 .cloned()
58 .unwrap_or_else(|| name.to_string())
59 }
60
61 pub fn is_alias(&self, name: &str) -> bool {
63 self.alias_map.contains_key(name)
64 }
65
66 pub fn get_aliases(&self) -> HashMap<String, String> {
68 self.alias_map.clone()
69 }
70
71 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
86 if let Some(table_prefix) = &column_ref.table_prefix {
87 let actual_table = self.resolve_alias(table_prefix);
89
90 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
92 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
93 debug!(
94 "Resolved {}.{} -> qualified column '{}' at index {}",
95 table_prefix, column_ref.name, qualified_name, idx
96 );
97 return Ok(idx);
98 }
99
100 if let Some(idx) = table.get_column_index(&column_ref.name) {
102 debug!(
103 "Resolved {}.{} -> unqualified column '{}' at index {}",
104 table_prefix, column_ref.name, column_ref.name, idx
105 );
106 return Ok(idx);
107 }
108
109 Err(anyhow!(
111 "Column '{}' not found. Table '{}' may not support qualified column names",
112 qualified_name,
113 actual_table
114 ))
115 } else {
116 if let Some(idx) = table.get_column_index(&column_ref.name) {
118 debug!(
119 "Resolved unqualified column '{}' at index {}",
120 column_ref.name, idx
121 );
122 return Ok(idx);
123 }
124
125 if column_ref.name.contains('.') {
127 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
128 debug!(
129 "Resolved '{}' as qualified column at index {}",
130 column_ref.name, idx
131 );
132 return Ok(idx);
133 }
134 }
135
136 let suggestion = self.find_similar_column(table, &column_ref.name);
138 match suggestion {
139 Some(similar) => Err(anyhow!(
140 "Column '{}' not found. Did you mean '{}'?",
141 column_ref.name,
142 similar
143 )),
144 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
145 }
146 }
147 }
148
149 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
151 let columns = table.column_names();
152 let mut best_match: Option<(String, usize)> = None;
153
154 for col in columns {
155 let distance = edit_distance(name, &col);
156 if distance <= 2 {
157 match best_match {
159 Some((_, best_dist)) if distance < best_dist => {
160 best_match = Some((col.clone(), distance));
161 }
162 None => {
163 best_match = Some((col.clone(), distance));
164 }
165 _ => {}
166 }
167 }
168 }
169
170 best_match.map(|(name, _)| name)
171 }
172}
173
174impl Default for ExecutionContext {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180fn edit_distance(a: &str, b: &str) -> usize {
182 let len_a = a.chars().count();
183 let len_b = b.chars().count();
184
185 if len_a == 0 {
186 return len_b;
187 }
188 if len_b == 0 {
189 return len_a;
190 }
191
192 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
193
194 for i in 0..=len_a {
195 matrix[i][0] = i;
196 }
197 for j in 0..=len_b {
198 matrix[0][j] = j;
199 }
200
201 let a_chars: Vec<char> = a.chars().collect();
202 let b_chars: Vec<char> = b.chars().collect();
203
204 for i in 1..=len_a {
205 for j in 1..=len_b {
206 let cost = if a_chars[i - 1] == b_chars[j - 1] {
207 0
208 } else {
209 1
210 };
211 matrix[i][j] = min(
212 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
213 matrix[i - 1][j - 1] + cost,
214 );
215 }
216 }
217
218 matrix[len_a][len_b]
219}
220
221#[derive(Clone)]
223pub struct QueryEngine {
224 case_insensitive: bool,
225 date_notation: String,
226 _behavior_config: Option<BehaviorConfig>,
227}
228
229impl Default for QueryEngine {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235impl QueryEngine {
236 #[must_use]
237 pub fn new() -> Self {
238 Self {
239 case_insensitive: false,
240 date_notation: get_date_notation(),
241 _behavior_config: None,
242 }
243 }
244
245 #[must_use]
246 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
247 let case_insensitive = config.case_insensitive_default;
248 let date_notation = get_date_notation();
250 Self {
251 case_insensitive,
252 date_notation,
253 _behavior_config: Some(config),
254 }
255 }
256
257 #[must_use]
258 pub fn with_date_notation(_date_notation: String) -> Self {
259 Self {
260 case_insensitive: false,
261 date_notation: get_date_notation(), _behavior_config: None,
263 }
264 }
265
266 #[must_use]
267 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
268 Self {
269 case_insensitive,
270 date_notation: get_date_notation(),
271 _behavior_config: None,
272 }
273 }
274
275 #[must_use]
276 pub fn with_case_insensitive_and_date_notation(
277 case_insensitive: bool,
278 _date_notation: String, ) -> Self {
280 Self {
281 case_insensitive,
282 date_notation: get_date_notation(), _behavior_config: None,
284 }
285 }
286
287 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
289 let columns = table.column_names();
290 let mut best_match: Option<(String, usize)> = None;
291
292 for col in columns {
293 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
294 let max_distance = if name.len() > 10 { 3 } else { 2 };
297 if distance <= max_distance {
298 match &best_match {
299 None => best_match = Some((col, distance)),
300 Some((_, best_dist)) if distance < *best_dist => {
301 best_match = Some((col, distance));
302 }
303 _ => {}
304 }
305 }
306 }
307
308 best_match.map(|(name, _)| name)
309 }
310
311 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
313 let len1 = s1.len();
314 let len2 = s2.len();
315 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
316
317 for i in 0..=len1 {
318 matrix[i][0] = i;
319 }
320 for j in 0..=len2 {
321 matrix[0][j] = j;
322 }
323
324 for (i, c1) in s1.chars().enumerate() {
325 for (j, c2) in s2.chars().enumerate() {
326 let cost = usize::from(c1 != c2);
327 matrix[i + 1][j + 1] = std::cmp::min(
328 matrix[i][j + 1] + 1, std::cmp::min(
330 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
333 );
334 }
335 }
336
337 matrix[len1][len2]
338 }
339
340 fn contains_unnest(expr: &SqlExpression) -> bool {
342 match expr {
343 SqlExpression::Unnest { .. } => true,
345 SqlExpression::FunctionCall { name, args, .. } => {
346 if name.to_uppercase() == "UNNEST" {
347 return true;
348 }
349 args.iter().any(Self::contains_unnest)
351 }
352 SqlExpression::BinaryOp { left, right, .. } => {
353 Self::contains_unnest(left) || Self::contains_unnest(right)
354 }
355 SqlExpression::Not { expr } => Self::contains_unnest(expr),
356 SqlExpression::CaseExpression {
357 when_branches,
358 else_branch,
359 } => {
360 when_branches.iter().any(|branch| {
361 Self::contains_unnest(&branch.condition)
362 || Self::contains_unnest(&branch.result)
363 }) || else_branch
364 .as_ref()
365 .map_or(false, |e| Self::contains_unnest(e))
366 }
367 SqlExpression::SimpleCaseExpression {
368 expr,
369 when_branches,
370 else_branch,
371 } => {
372 Self::contains_unnest(expr)
373 || when_branches.iter().any(|branch| {
374 Self::contains_unnest(&branch.value)
375 || Self::contains_unnest(&branch.result)
376 })
377 || else_branch
378 .as_ref()
379 .map_or(false, |e| Self::contains_unnest(e))
380 }
381 SqlExpression::InList { expr, values } => {
382 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
383 }
384 SqlExpression::NotInList { expr, values } => {
385 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
386 }
387 SqlExpression::Between { expr, lower, upper } => {
388 Self::contains_unnest(expr)
389 || Self::contains_unnest(lower)
390 || Self::contains_unnest(upper)
391 }
392 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
393 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
394 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
396 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
397 SqlExpression::ChainedMethodCall { base, args, .. } => {
398 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
399 }
400 SqlExpression::Unnest { .. } => true, _ => false,
402 }
403 }
404
405 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
407 let (view, _plan) = self.execute_with_plan(table, sql)?;
408 Ok(view)
409 }
410
411 pub fn execute_with_temp_tables(
413 &self,
414 table: Arc<DataTable>,
415 sql: &str,
416 temp_tables: Option<&TempTableRegistry>,
417 ) -> Result<DataView> {
418 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
419 Ok(view)
420 }
421
422 pub fn execute_statement(
424 &self,
425 table: Arc<DataTable>,
426 statement: SelectStatement,
427 ) -> Result<DataView> {
428 self.execute_statement_with_temp_tables(table, statement, None)
429 }
430
431 pub fn execute_statement_with_temp_tables(
433 &self,
434 table: Arc<DataTable>,
435 statement: SelectStatement,
436 temp_tables: Option<&TempTableRegistry>,
437 ) -> Result<DataView> {
438 let mut cte_context = HashMap::new();
440
441 if let Some(temp_registry) = temp_tables {
443 for table_name in temp_registry.list_tables() {
444 if let Some(temp_table) = temp_registry.get(&table_name) {
445 debug!("Adding temp table {} to CTE context", table_name);
446 let view = DataView::new(temp_table);
447 cte_context.insert(table_name, Arc::new(view));
448 }
449 }
450 }
451
452 for cte in &statement.ctes {
453 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
454 let cte_result = match &cte.cte_type {
456 CTEType::Standard(query) => {
457 let view = self.build_view_with_context(
459 table.clone(),
460 query.clone(),
461 &mut cte_context,
462 )?;
463
464 let mut materialized = self.materialize_view(view)?;
466
467 for column in materialized.columns_mut() {
469 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
470 column.source_table = Some(cte.name.clone());
471 }
472
473 DataView::new(Arc::new(materialized))
474 }
475 CTEType::Web(web_spec) => {
476 use crate::web::http_fetcher::WebDataFetcher;
478
479 let fetcher = WebDataFetcher::new()?;
480 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
482
483 for column in data_table.columns_mut() {
485 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
486 column.source_table = Some(cte.name.clone());
487 }
488
489 DataView::new(Arc::new(data_table))
491 }
492 };
493 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
495 debug!(
496 "QueryEngine: CTE '{}' pre-processed, stored in context",
497 cte.name
498 );
499 }
500
501 let mut subquery_executor =
503 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
504 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
505
506 self.build_view_with_context(table, processed_statement, &mut cte_context)
508 }
509
510 pub fn execute_statement_with_cte_context(
512 &self,
513 table: Arc<DataTable>,
514 statement: SelectStatement,
515 cte_context: &HashMap<String, Arc<DataView>>,
516 ) -> Result<DataView> {
517 let mut local_context = cte_context.clone();
519
520 for cte in &statement.ctes {
522 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
523 let cte_result = match &cte.cte_type {
524 CTEType::Standard(query) => {
525 let view = self.build_view_with_context(
526 table.clone(),
527 query.clone(),
528 &mut local_context,
529 )?;
530
531 let mut materialized = self.materialize_view(view)?;
533
534 for column in materialized.columns_mut() {
536 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
537 column.source_table = Some(cte.name.clone());
538 }
539
540 DataView::new(Arc::new(materialized))
541 }
542 CTEType::Web(web_spec) => {
543 use crate::web::http_fetcher::WebDataFetcher;
545
546 let fetcher = WebDataFetcher::new()?;
547 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
549
550 for column in data_table.columns_mut() {
552 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
553 column.source_table = Some(cte.name.clone());
554 }
555
556 DataView::new(Arc::new(data_table))
558 }
559 };
560 local_context.insert(cte.name.clone(), Arc::new(cte_result));
561 }
562
563 let mut subquery_executor =
565 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
566 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
567
568 self.build_view_with_context(table, processed_statement, &mut local_context)
570 }
571
572 pub fn execute_with_plan(
574 &self,
575 table: Arc<DataTable>,
576 sql: &str,
577 ) -> Result<(DataView, ExecutionPlan)> {
578 self.execute_with_plan_and_temp_tables(table, sql, None)
579 }
580
581 pub fn execute_with_plan_and_temp_tables(
583 &self,
584 table: Arc<DataTable>,
585 sql: &str,
586 temp_tables: Option<&TempTableRegistry>,
587 ) -> Result<(DataView, ExecutionPlan)> {
588 let mut plan_builder = ExecutionPlanBuilder::new();
589 let start_time = Instant::now();
590
591 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
593 plan_builder.add_detail(format!("Query: {}", sql));
594 let mut parser = Parser::new(sql);
595 let statement = parser
596 .parse()
597 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
598 plan_builder.add_detail(format!("Parsed successfully"));
599 if let Some(from) = &statement.from_table {
600 plan_builder.add_detail(format!("FROM: {}", from));
601 }
602 if statement.where_clause.is_some() {
603 plan_builder.add_detail("WHERE clause present".to_string());
604 }
605 plan_builder.end_step();
606
607 let mut cte_context = HashMap::new();
609
610 if let Some(temp_registry) = temp_tables {
612 for table_name in temp_registry.list_tables() {
613 if let Some(temp_table) = temp_registry.get(&table_name) {
614 debug!("Adding temp table {} to CTE context", table_name);
615 let view = DataView::new(temp_table);
616 cte_context.insert(table_name, Arc::new(view));
617 }
618 }
619 }
620
621 if !statement.ctes.is_empty() {
622 plan_builder.begin_step(
623 StepType::CTE,
624 format!("Process {} CTEs", statement.ctes.len()),
625 );
626
627 for cte in &statement.ctes {
628 let cte_start = Instant::now();
629 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
630
631 let cte_result = match &cte.cte_type {
632 CTEType::Standard(query) => {
633 if let Some(from) = &query.from_table {
635 plan_builder.add_detail(format!("Source: {}", from));
636 }
637 if query.where_clause.is_some() {
638 plan_builder.add_detail("Has WHERE clause".to_string());
639 }
640 if query.group_by.is_some() {
641 plan_builder.add_detail("Has GROUP BY".to_string());
642 }
643
644 debug!(
645 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
646 cte.name,
647 cte_context.keys().collect::<Vec<_>>()
648 );
649
650 let mut subquery_executor = SubqueryExecutor::with_cte_context(
653 self.clone(),
654 table.clone(),
655 cte_context.clone(),
656 );
657 let processed_query = subquery_executor.execute_subqueries(query)?;
658
659 let view = self.build_view_with_context(
660 table.clone(),
661 processed_query,
662 &mut cte_context,
663 )?;
664
665 let mut materialized = self.materialize_view(view)?;
667
668 for column in materialized.columns_mut() {
670 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
671 column.source_table = Some(cte.name.clone());
672 }
673
674 DataView::new(Arc::new(materialized))
675 }
676 CTEType::Web(web_spec) => {
677 plan_builder.add_detail(format!("URL: {}", web_spec.url));
678 if let Some(format) = &web_spec.format {
679 plan_builder.add_detail(format!("Format: {:?}", format));
680 }
681 if let Some(cache) = web_spec.cache_seconds {
682 plan_builder.add_detail(format!("Cache: {} seconds", cache));
683 }
684
685 use crate::web::http_fetcher::WebDataFetcher;
687
688 let fetcher = WebDataFetcher::new()?;
689 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
691
692 for column in data_table.columns_mut() {
694 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
695 column.source_table = Some(cte.name.clone());
696 }
697
698 DataView::new(Arc::new(data_table))
700 }
701 };
702
703 plan_builder.set_rows_out(cte_result.row_count());
705 plan_builder.add_detail(format!(
706 "Result: {} rows, {} columns",
707 cte_result.row_count(),
708 cte_result.column_count()
709 ));
710 plan_builder.add_detail(format!(
711 "Execution time: {:.3}ms",
712 cte_start.elapsed().as_secs_f64() * 1000.0
713 ));
714
715 debug!(
716 "QueryEngine: Storing CTE '{}' in context with {} rows",
717 cte.name,
718 cte_result.row_count()
719 );
720 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
721 plan_builder.end_step();
722 }
723
724 plan_builder.add_detail(format!(
725 "All {} CTEs cached in context",
726 statement.ctes.len()
727 ));
728 plan_builder.end_step();
729 }
730
731 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
733 let mut subquery_executor =
734 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
735
736 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
738 format!("{:?}", w).contains("Subquery")
740 });
741
742 if has_subqueries {
743 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
744 }
745
746 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
747
748 if has_subqueries {
749 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
750 } else {
751 plan_builder.add_detail("No subqueries to process".to_string());
752 }
753
754 plan_builder.end_step();
755 let result = self.build_view_with_context_and_plan(
756 table,
757 processed_statement,
758 &mut cte_context,
759 &mut plan_builder,
760 )?;
761
762 let total_duration = start_time.elapsed();
763 info!(
764 "Query execution complete: total={:?}, rows={}",
765 total_duration,
766 result.row_count()
767 );
768
769 let plan = plan_builder.build();
770 Ok((result, plan))
771 }
772
773 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
775 let mut cte_context = HashMap::new();
776 self.build_view_with_context(table, statement, &mut cte_context)
777 }
778
779 fn build_view_with_context(
781 &self,
782 table: Arc<DataTable>,
783 statement: SelectStatement,
784 cte_context: &mut HashMap<String, Arc<DataView>>,
785 ) -> Result<DataView> {
786 let mut dummy_plan = ExecutionPlanBuilder::new();
787 let mut exec_context = ExecutionContext::new();
788 self.build_view_with_context_and_plan_and_exec(
789 table,
790 statement,
791 cte_context,
792 &mut dummy_plan,
793 &mut exec_context,
794 )
795 }
796
797 fn build_view_with_context_and_plan(
799 &self,
800 table: Arc<DataTable>,
801 statement: SelectStatement,
802 cte_context: &mut HashMap<String, Arc<DataView>>,
803 plan: &mut ExecutionPlanBuilder,
804 ) -> Result<DataView> {
805 let mut exec_context = ExecutionContext::new();
806 self.build_view_with_context_and_plan_and_exec(
807 table,
808 statement,
809 cte_context,
810 plan,
811 &mut exec_context,
812 )
813 }
814
815 fn build_view_with_context_and_plan_and_exec(
817 &self,
818 table: Arc<DataTable>,
819 statement: SelectStatement,
820 cte_context: &mut HashMap<String, Arc<DataView>>,
821 plan: &mut ExecutionPlanBuilder,
822 exec_context: &mut ExecutionContext,
823 ) -> Result<DataView> {
824 for cte in &statement.ctes {
826 if cte_context.contains_key(&cte.name) {
828 debug!(
829 "QueryEngine: CTE '{}' already in context, skipping",
830 cte.name
831 );
832 continue;
833 }
834
835 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
836 debug!(
837 "QueryEngine: Available CTEs for '{}': {:?}",
838 cte.name,
839 cte_context.keys().collect::<Vec<_>>()
840 );
841
842 let cte_result = match &cte.cte_type {
844 CTEType::Standard(query) => {
845 let view =
846 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
847
848 let mut materialized = self.materialize_view(view)?;
850
851 for column in materialized.columns_mut() {
853 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
854 column.source_table = Some(cte.name.clone());
855 }
856
857 DataView::new(Arc::new(materialized))
858 }
859 CTEType::Web(_web_spec) => {
860 return Err(anyhow!(
862 "Web CTEs should be processed in execute_select method"
863 ));
864 }
865 };
866
867 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
869 debug!(
870 "QueryEngine: CTE '{}' processed, stored in context",
871 cte.name
872 );
873 }
874
875 let source_table = if let Some(ref table_func) = statement.from_function {
877 debug!("QueryEngine: Processing table function...");
879 match table_func {
880 TableFunction::Generator { name, args } => {
881 use crate::sql::generators::GeneratorRegistry;
883
884 let registry = GeneratorRegistry::new();
886
887 if let Some(generator) = registry.get(name) {
888 let mut evaluator = ArithmeticEvaluator::with_date_notation(
890 &table,
891 self.date_notation.clone(),
892 );
893 let dummy_row = 0;
894
895 let mut evaluated_args = Vec::new();
896 for arg in args {
897 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
898 }
899
900 generator.generate(evaluated_args)?
902 } else {
903 return Err(anyhow!("Unknown generator function: {}", name));
904 }
905 }
906 }
907 } else if let Some(ref subquery) = statement.from_subquery {
908 debug!("QueryEngine: Processing FROM subquery...");
910 let subquery_result =
911 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
912
913 let materialized = self.materialize_view(subquery_result)?;
916 Arc::new(materialized)
917 } else if let Some(ref table_name) = statement.from_table {
918 if let Some(cte_view) = cte_context.get(table_name) {
920 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
921 let mut materialized = self.materialize_view((**cte_view).clone())?;
923
924 if let Some(ref alias) = statement.from_alias {
926 debug!(
927 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
928 alias, table_name
929 );
930 for column in materialized.columns_mut() {
931 if let Some(ref qualified_name) = column.qualified_name {
933 if qualified_name.starts_with(&format!("{}.", table_name)) {
934 column.qualified_name =
935 Some(qualified_name.replace(
936 &format!("{}.", table_name),
937 &format!("{}.", alias),
938 ));
939 }
940 }
941 if column.source_table.as_ref() == Some(table_name) {
943 column.source_table = Some(alias.clone());
944 }
945 }
946 }
947
948 Arc::new(materialized)
949 } else {
950 table.clone()
952 }
953 } else {
954 table.clone()
956 };
957
958 if let Some(ref alias) = statement.from_alias {
960 if let Some(ref table_name) = statement.from_table {
961 exec_context.register_alias(alias.clone(), table_name.clone());
962 }
963 }
964
965 let final_table = if !statement.joins.is_empty() {
967 plan.begin_step(
968 StepType::Join,
969 format!("Process {} JOINs", statement.joins.len()),
970 );
971 plan.set_rows_in(source_table.row_count());
972
973 let join_executor = HashJoinExecutor::new(self.case_insensitive);
974 let mut current_table = source_table;
975
976 for (idx, join_clause) in statement.joins.iter().enumerate() {
977 let join_start = Instant::now();
978 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
979 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
980 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
981 plan.add_detail(format!(
982 "Executing {:?} JOIN on {} condition(s)",
983 join_clause.join_type,
984 join_clause.condition.conditions.len()
985 ));
986
987 let right_table = match &join_clause.table {
989 TableSource::Table(name) => {
990 if let Some(cte_view) = cte_context.get(name) {
992 let mut materialized = self.materialize_view((**cte_view).clone())?;
993
994 if let Some(ref alias) = join_clause.alias {
996 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
997 for column in materialized.columns_mut() {
998 if let Some(ref qualified_name) = column.qualified_name {
1000 if qualified_name.starts_with(&format!("{}.", name)) {
1001 column.qualified_name = Some(qualified_name.replace(
1002 &format!("{}.", name),
1003 &format!("{}.", alias),
1004 ));
1005 }
1006 }
1007 if column.source_table.as_ref() == Some(name) {
1009 column.source_table = Some(alias.clone());
1010 }
1011 }
1012 }
1013
1014 Arc::new(materialized)
1015 } else {
1016 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1019 }
1020 }
1021 TableSource::DerivedTable { query, alias: _ } => {
1022 let subquery_result = self.build_view_with_context(
1024 table.clone(),
1025 *query.clone(),
1026 cte_context,
1027 )?;
1028 let materialized = self.materialize_view(subquery_result)?;
1029 Arc::new(materialized)
1030 }
1031 };
1032
1033 let joined = join_executor.execute_join(
1035 current_table.clone(),
1036 join_clause,
1037 right_table.clone(),
1038 )?;
1039
1040 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1041 plan.set_rows_out(joined.row_count());
1042 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1043 plan.add_detail(format!(
1044 "Join time: {:.3}ms",
1045 join_start.elapsed().as_secs_f64() * 1000.0
1046 ));
1047 plan.end_step();
1048
1049 current_table = Arc::new(joined);
1050 }
1051
1052 plan.set_rows_out(current_table.row_count());
1053 plan.add_detail(format!(
1054 "Final result after all joins: {} rows",
1055 current_table.row_count()
1056 ));
1057 plan.end_step();
1058 current_table
1059 } else {
1060 source_table
1061 };
1062
1063 self.build_view_internal_with_plan_and_exec(
1065 final_table,
1066 statement,
1067 plan,
1068 Some(exec_context),
1069 )
1070 }
1071
1072 fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1074 let source = view.source();
1075 let mut result_table = DataTable::new("derived");
1076
1077 let visible_cols = view.visible_column_indices().to_vec();
1079
1080 for col_idx in &visible_cols {
1082 let col = &source.columns[*col_idx];
1083 let new_col = DataColumn {
1084 name: col.name.clone(),
1085 data_type: col.data_type.clone(),
1086 nullable: col.nullable,
1087 unique_values: col.unique_values,
1088 null_count: col.null_count,
1089 metadata: col.metadata.clone(),
1090 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1093 result_table.add_column(new_col);
1094 }
1095
1096 for row_idx in view.visible_row_indices() {
1098 let source_row = &source.rows[*row_idx];
1099 let mut new_row = DataRow { values: Vec::new() };
1100
1101 for col_idx in &visible_cols {
1102 new_row.values.push(source_row.values[*col_idx].clone());
1103 }
1104
1105 result_table.add_row(new_row);
1106 }
1107
1108 Ok(result_table)
1109 }
1110
1111 fn build_view_internal(
1112 &self,
1113 table: Arc<DataTable>,
1114 statement: SelectStatement,
1115 ) -> Result<DataView> {
1116 let mut dummy_plan = ExecutionPlanBuilder::new();
1117 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1118 }
1119
1120 fn build_view_internal_with_plan(
1121 &self,
1122 table: Arc<DataTable>,
1123 statement: SelectStatement,
1124 plan: &mut ExecutionPlanBuilder,
1125 ) -> Result<DataView> {
1126 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1127 }
1128
1129 fn build_view_internal_with_plan_and_exec(
1130 &self,
1131 table: Arc<DataTable>,
1132 statement: SelectStatement,
1133 plan: &mut ExecutionPlanBuilder,
1134 exec_context: Option<&ExecutionContext>,
1135 ) -> Result<DataView> {
1136 debug!(
1137 "QueryEngine::build_view - select_items: {:?}",
1138 statement.select_items
1139 );
1140 debug!(
1141 "QueryEngine::build_view - where_clause: {:?}",
1142 statement.where_clause
1143 );
1144
1145 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1147
1148 if let Some(where_clause) = &statement.where_clause {
1150 let total_rows = table.row_count();
1151 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1152 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1153
1154 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1155 plan.set_rows_in(total_rows);
1156 plan.add_detail(format!("Input: {} rows", total_rows));
1157
1158 for condition in &where_clause.conditions {
1160 plan.add_detail(format!("Condition: {:?}", condition.expr));
1161 }
1162
1163 let filter_start = Instant::now();
1164 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1166
1167 let mut filtered_rows = Vec::new();
1169 for row_idx in visible_rows {
1170 if row_idx < 3 {
1172 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1173 }
1174
1175 let mut evaluator = if let Some(exec_ctx) = exec_context {
1177 RecursiveWhereEvaluator::with_exec_context(
1178 &table,
1179 exec_ctx,
1180 self.case_insensitive,
1181 )
1182 } else {
1183 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1184 };
1185
1186 match evaluator.evaluate(where_clause, row_idx) {
1187 Ok(result) => {
1188 if row_idx < 3 {
1189 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1190 }
1191 if result {
1192 filtered_rows.push(row_idx);
1193 }
1194 }
1195 Err(e) => {
1196 if row_idx < 3 {
1197 debug!(
1198 "QueryEngine: WHERE evaluation error for row {}: {}",
1199 row_idx, e
1200 );
1201 }
1202 return Err(e);
1204 }
1205 }
1206 }
1207
1208 let (compilations, cache_hits) = eval_context.get_stats();
1210 if compilations > 0 || cache_hits > 0 {
1211 debug!(
1212 "LIKE pattern cache: {} compilations, {} cache hits",
1213 compilations, cache_hits
1214 );
1215 }
1216 visible_rows = filtered_rows;
1217 let filter_duration = filter_start.elapsed();
1218 info!(
1219 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1220 total_rows,
1221 visible_rows.len(),
1222 filter_duration
1223 );
1224
1225 plan.set_rows_out(visible_rows.len());
1226 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1227 plan.add_detail(format!(
1228 "Filter time: {:.3}ms",
1229 filter_duration.as_secs_f64() * 1000.0
1230 ));
1231 plan.end_step();
1232 }
1233
1234 let mut view = DataView::new(table.clone());
1236 view = view.with_rows(visible_rows);
1237
1238 if let Some(group_by_exprs) = &statement.group_by {
1240 if !group_by_exprs.is_empty() {
1241 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1242
1243 plan.begin_step(
1244 StepType::GroupBy,
1245 format!("GROUP BY {} expressions", group_by_exprs.len()),
1246 );
1247 plan.set_rows_in(view.row_count());
1248 plan.add_detail(format!("Input: {} rows", view.row_count()));
1249 for expr in group_by_exprs {
1250 plan.add_detail(format!("Group by: {:?}", expr));
1251 }
1252
1253 let group_start = Instant::now();
1254 view = self.apply_group_by(
1255 view,
1256 group_by_exprs,
1257 &statement.select_items,
1258 statement.having.as_ref(),
1259 plan,
1260 )?;
1261
1262 plan.set_rows_out(view.row_count());
1263 plan.add_detail(format!("Output: {} groups", view.row_count()));
1264 plan.add_detail(format!(
1265 "Overall time: {:.3}ms",
1266 group_start.elapsed().as_secs_f64() * 1000.0
1267 ));
1268 plan.end_step();
1269 }
1270 } else {
1271 if !statement.select_items.is_empty() {
1273 let has_non_star_items = statement
1275 .select_items
1276 .iter()
1277 .any(|item| !matches!(item, SelectItem::Star));
1278
1279 if has_non_star_items || statement.select_items.len() > 1 {
1283 view = self.apply_select_items(
1284 view,
1285 &statement.select_items,
1286 &statement,
1287 exec_context,
1288 )?;
1289 } else {
1290 }
1291 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1293 debug!("QueryEngine: Using legacy columns path");
1294 let source_table = view.source();
1297 let column_indices =
1298 self.resolve_column_indices(source_table, &statement.columns)?;
1299 view = view.with_columns(column_indices);
1300 }
1301 }
1302
1303 if statement.distinct {
1305 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1306 plan.set_rows_in(view.row_count());
1307 plan.add_detail(format!("Input: {} rows", view.row_count()));
1308
1309 let distinct_start = Instant::now();
1310 view = self.apply_distinct(view)?;
1311
1312 plan.set_rows_out(view.row_count());
1313 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1314 plan.add_detail(format!(
1315 "Distinct time: {:.3}ms",
1316 distinct_start.elapsed().as_secs_f64() * 1000.0
1317 ));
1318 plan.end_step();
1319 }
1320
1321 if let Some(order_by_columns) = &statement.order_by {
1323 if !order_by_columns.is_empty() {
1324 plan.begin_step(
1325 StepType::Sort,
1326 format!("ORDER BY {} columns", order_by_columns.len()),
1327 );
1328 plan.set_rows_in(view.row_count());
1329 for col in order_by_columns {
1330 plan.add_detail(format!("{} {:?}", col.column, col.direction));
1331 }
1332
1333 let sort_start = Instant::now();
1334 view =
1335 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1336
1337 plan.add_detail(format!(
1338 "Sort time: {:.3}ms",
1339 sort_start.elapsed().as_secs_f64() * 1000.0
1340 ));
1341 plan.end_step();
1342 }
1343 }
1344
1345 if let Some(limit) = statement.limit {
1347 let offset = statement.offset.unwrap_or(0);
1348 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1349 plan.set_rows_in(view.row_count());
1350 if offset > 0 {
1351 plan.add_detail(format!("OFFSET: {}", offset));
1352 }
1353 view = view.with_limit(limit, offset);
1354 plan.set_rows_out(view.row_count());
1355 plan.add_detail(format!("Output: {} rows", view.row_count()));
1356 plan.end_step();
1357 }
1358
1359 Ok(view)
1360 }
1361
1362 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1364 let mut indices = Vec::new();
1365 let table_columns = table.column_names();
1366
1367 for col_name in columns {
1368 let index = table_columns
1369 .iter()
1370 .position(|c| c.eq_ignore_ascii_case(col_name))
1371 .ok_or_else(|| {
1372 let suggestion = self.find_similar_column(table, col_name);
1373 match suggestion {
1374 Some(similar) => anyhow::anyhow!(
1375 "Column '{}' not found. Did you mean '{}'?",
1376 col_name,
1377 similar
1378 ),
1379 None => anyhow::anyhow!("Column '{}' not found", col_name),
1380 }
1381 })?;
1382 indices.push(index);
1383 }
1384
1385 Ok(indices)
1386 }
1387
1388 fn apply_select_items(
1390 &self,
1391 view: DataView,
1392 select_items: &[SelectItem],
1393 statement: &SelectStatement,
1394 exec_context: Option<&ExecutionContext>,
1395 ) -> Result<DataView> {
1396 debug!(
1397 "QueryEngine::apply_select_items - items: {:?}",
1398 select_items
1399 );
1400 debug!(
1401 "QueryEngine::apply_select_items - input view has {} rows",
1402 view.row_count()
1403 );
1404
1405 let has_unnest = select_items.iter().any(|item| match item {
1407 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1408 _ => false,
1409 });
1410
1411 if has_unnest {
1412 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1413 return self.apply_select_with_row_expansion(view, select_items);
1414 }
1415
1416 let has_aggregates = select_items.iter().any(|item| match item {
1420 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1421 SelectItem::Column(_) => false,
1422 SelectItem::Star => false,
1423 });
1424
1425 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1426 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1427 SelectItem::Column(_) => false, SelectItem::Star => false, });
1430
1431 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1432 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1435 return self.apply_aggregate_select(view, select_items);
1436 }
1437
1438 let has_computed_expressions = select_items
1440 .iter()
1441 .any(|item| matches!(item, SelectItem::Expression { .. }));
1442
1443 debug!(
1444 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1445 has_computed_expressions
1446 );
1447
1448 if !has_computed_expressions {
1449 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1451 return Ok(view.with_columns(column_indices));
1452 }
1453
1454 let source_table = view.source();
1459 let visible_rows = view.visible_row_indices();
1460
1461 let mut computed_table = DataTable::new("query_result");
1464
1465 let mut expanded_items = Vec::new();
1467 for item in select_items {
1468 match item {
1469 SelectItem::Star => {
1470 for col_name in source_table.column_names() {
1472 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1473 col_name.to_string(),
1474 )));
1475 }
1476 }
1477 _ => expanded_items.push(item.clone()),
1478 }
1479 }
1480
1481 let mut column_name_counts: std::collections::HashMap<String, usize> =
1483 std::collections::HashMap::new();
1484
1485 for item in &expanded_items {
1486 let base_name = match item {
1487 SelectItem::Column(col_ref) => col_ref.name.clone(),
1488 SelectItem::Expression { alias, .. } => alias.clone(),
1489 SelectItem::Star => unreachable!("Star should have been expanded"),
1490 };
1491
1492 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1494 let column_name = if *count == 0 {
1495 base_name.clone()
1497 } else {
1498 format!("{base_name}_{count}")
1500 };
1501 *count += 1;
1502
1503 computed_table.add_column(DataColumn::new(&column_name));
1504 }
1505
1506 let mut evaluator =
1508 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1509
1510 if let Some(exec_ctx) = exec_context {
1512 let aliases = exec_ctx.get_aliases();
1513 if !aliases.is_empty() {
1514 debug!(
1515 "Applying {} aliases to evaluator: {:?}",
1516 aliases.len(),
1517 aliases
1518 );
1519 evaluator = evaluator.with_table_aliases(aliases);
1520 }
1521 }
1522
1523 for &row_idx in visible_rows {
1524 let mut row_values = Vec::new();
1525
1526 for item in &expanded_items {
1527 let value = match item {
1528 SelectItem::Column(col_ref) => {
1529 match evaluator.evaluate(&SqlExpression::Column(col_ref.clone()), row_idx) {
1531 Ok(val) => val,
1532 Err(e) => {
1533 return Err(anyhow!(
1534 "Failed to evaluate column {}: {}",
1535 col_ref.to_sql(),
1536 e
1537 ));
1538 }
1539 }
1540 }
1541 SelectItem::Expression { expr, .. } => {
1542 evaluator.evaluate(expr, row_idx)?
1544 }
1545 SelectItem::Star => unreachable!("Star should have been expanded"),
1546 };
1547 row_values.push(value);
1548 }
1549
1550 computed_table
1551 .add_row(DataRow::new(row_values))
1552 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
1553 }
1554
1555 Ok(DataView::new(Arc::new(computed_table)))
1558 }
1559
1560 fn apply_select_with_row_expansion(
1562 &self,
1563 view: DataView,
1564 select_items: &[SelectItem],
1565 ) -> Result<DataView> {
1566 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
1567
1568 let source_table = view.source();
1569 let visible_rows = view.visible_row_indices();
1570 let expander_registry = RowExpanderRegistry::new();
1571
1572 let mut result_table = DataTable::new("unnest_result");
1574
1575 let mut expanded_items = Vec::new();
1577 for item in select_items {
1578 match item {
1579 SelectItem::Star => {
1580 for col_name in source_table.column_names() {
1581 expanded_items.push(SelectItem::Column(ColumnRef::unquoted(
1582 col_name.to_string(),
1583 )));
1584 }
1585 }
1586 _ => expanded_items.push(item.clone()),
1587 }
1588 }
1589
1590 for item in &expanded_items {
1592 let column_name = match item {
1593 SelectItem::Column(col_ref) => col_ref.name.clone(),
1594 SelectItem::Expression { alias, .. } => alias.clone(),
1595 SelectItem::Star => unreachable!("Star should have been expanded"),
1596 };
1597 result_table.add_column(DataColumn::new(&column_name));
1598 }
1599
1600 let mut evaluator =
1602 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
1603
1604 for &row_idx in visible_rows {
1605 let mut unnest_expansions = Vec::new();
1607 let mut unnest_indices = Vec::new();
1608
1609 for (col_idx, item) in expanded_items.iter().enumerate() {
1610 if let SelectItem::Expression { expr, .. } = item {
1611 if let Some(expansion_result) = self.try_expand_unnest(
1612 expr,
1613 source_table,
1614 row_idx,
1615 &mut evaluator,
1616 &expander_registry,
1617 )? {
1618 unnest_expansions.push(expansion_result);
1619 unnest_indices.push(col_idx);
1620 }
1621 }
1622 }
1623
1624 let expansion_count = if unnest_expansions.is_empty() {
1626 1 } else {
1628 unnest_expansions
1629 .iter()
1630 .map(|exp| exp.row_count())
1631 .max()
1632 .unwrap_or(1)
1633 };
1634
1635 for output_idx in 0..expansion_count {
1637 let mut row_values = Vec::new();
1638
1639 for (col_idx, item) in expanded_items.iter().enumerate() {
1640 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
1642
1643 let value = if let Some(unnest_idx) = unnest_position {
1644 let expansion = &unnest_expansions[unnest_idx];
1646 expansion
1647 .values
1648 .get(output_idx)
1649 .cloned()
1650 .unwrap_or(DataValue::Null)
1651 } else {
1652 match item {
1654 SelectItem::Column(col_ref) => {
1655 let col_idx =
1656 source_table.get_column_index(&col_ref.name).ok_or_else(
1657 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
1658 )?;
1659 let row = source_table
1660 .get_row(row_idx)
1661 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
1662 row.get(col_idx)
1663 .ok_or_else(|| {
1664 anyhow::anyhow!("Column {} not found in row", col_idx)
1665 })?
1666 .clone()
1667 }
1668 SelectItem::Expression { expr, .. } => {
1669 evaluator.evaluate(expr, row_idx)?
1671 }
1672 SelectItem::Star => unreachable!(),
1673 }
1674 };
1675
1676 row_values.push(value);
1677 }
1678
1679 result_table
1680 .add_row(DataRow::new(row_values))
1681 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
1682 }
1683 }
1684
1685 debug!(
1686 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
1687 visible_rows.len(),
1688 result_table.row_count()
1689 );
1690
1691 Ok(DataView::new(Arc::new(result_table)))
1692 }
1693
1694 fn try_expand_unnest(
1697 &self,
1698 expr: &SqlExpression,
1699 _source_table: &DataTable,
1700 row_idx: usize,
1701 evaluator: &mut ArithmeticEvaluator,
1702 expander_registry: &RowExpanderRegistry,
1703 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
1704 if let SqlExpression::Unnest { column, delimiter } = expr {
1706 let column_value = evaluator.evaluate(column, row_idx)?;
1708
1709 let delimiter_value = DataValue::String(delimiter.clone());
1711
1712 let expander = expander_registry
1714 .get("UNNEST")
1715 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1716
1717 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1719 return Ok(Some(expansion));
1720 }
1721
1722 if let SqlExpression::FunctionCall { name, args, .. } = expr {
1724 if name.to_uppercase() == "UNNEST" {
1725 if args.len() != 2 {
1727 return Err(anyhow::anyhow!(
1728 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
1729 ));
1730 }
1731
1732 let column_value = evaluator.evaluate(&args[0], row_idx)?;
1734
1735 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
1737
1738 let expander = expander_registry
1740 .get("UNNEST")
1741 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
1742
1743 let expansion = expander.expand(&column_value, &[delimiter_value])?;
1745 return Ok(Some(expansion));
1746 }
1747 }
1748
1749 Ok(None)
1750 }
1751
1752 fn apply_aggregate_select(
1754 &self,
1755 view: DataView,
1756 select_items: &[SelectItem],
1757 ) -> Result<DataView> {
1758 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
1759
1760 let source_table = view.source();
1761 let mut result_table = DataTable::new("aggregate_result");
1762
1763 for item in select_items {
1765 let column_name = match item {
1766 SelectItem::Expression { alias, .. } => alias.clone(),
1767 _ => unreachable!("Should only have expressions in aggregate-only query"),
1768 };
1769 result_table.add_column(DataColumn::new(&column_name));
1770 }
1771
1772 let visible_rows = view.visible_row_indices().to_vec();
1774 let mut evaluator =
1775 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
1776 .with_visible_rows(visible_rows);
1777
1778 let mut row_values = Vec::new();
1780 for item in select_items {
1781 match item {
1782 SelectItem::Expression { expr, .. } => {
1783 let value = evaluator.evaluate(expr, 0)?;
1786 row_values.push(value);
1787 }
1788 _ => unreachable!("Should only have expressions in aggregate-only query"),
1789 }
1790 }
1791
1792 result_table
1794 .add_row(DataRow::new(row_values))
1795 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
1796
1797 Ok(DataView::new(Arc::new(result_table)))
1798 }
1799
1800 fn resolve_select_columns(
1802 &self,
1803 table: &DataTable,
1804 select_items: &[SelectItem],
1805 ) -> Result<Vec<usize>> {
1806 let mut indices = Vec::new();
1807 let table_columns = table.column_names();
1808
1809 for item in select_items {
1810 match item {
1811 SelectItem::Column(col_ref) => {
1812 let index = if let Some(table_prefix) = &col_ref.table_prefix {
1814 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
1816 table.find_column_by_qualified_name(&qualified_name)
1817 .ok_or_else(|| {
1818 let has_qualified = table.columns.iter()
1820 .any(|c| c.qualified_name.is_some());
1821 if !has_qualified {
1822 anyhow::anyhow!(
1823 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
1824 qualified_name, table_prefix
1825 )
1826 } else {
1827 anyhow::anyhow!("Column '{}' not found", qualified_name)
1828 }
1829 })?
1830 } else {
1831 table_columns
1833 .iter()
1834 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
1835 .ok_or_else(|| {
1836 let suggestion = self.find_similar_column(table, &col_ref.name);
1837 match suggestion {
1838 Some(similar) => anyhow::anyhow!(
1839 "Column '{}' not found. Did you mean '{}'?",
1840 col_ref.name,
1841 similar
1842 ),
1843 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
1844 }
1845 })?
1846 };
1847 indices.push(index);
1848 }
1849 SelectItem::Star => {
1850 for i in 0..table_columns.len() {
1852 indices.push(i);
1853 }
1854 }
1855 SelectItem::Expression { .. } => {
1856 return Err(anyhow::anyhow!(
1857 "Computed expressions require new table creation"
1858 ));
1859 }
1860 }
1861 }
1862
1863 Ok(indices)
1864 }
1865
1866 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
1868 use std::collections::HashSet;
1869
1870 let source = view.source();
1871 let visible_cols = view.visible_column_indices();
1872 let visible_rows = view.visible_row_indices();
1873
1874 let mut seen_rows = HashSet::new();
1876 let mut unique_row_indices = Vec::new();
1877
1878 for &row_idx in visible_rows {
1879 let mut row_key = Vec::new();
1881 for &col_idx in visible_cols {
1882 let value = source
1883 .get_value(row_idx, col_idx)
1884 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
1885 row_key.push(format!("{:?}", value));
1887 }
1888
1889 if seen_rows.insert(row_key) {
1891 unique_row_indices.push(row_idx);
1893 }
1894 }
1895
1896 Ok(view.with_rows(unique_row_indices))
1898 }
1899
1900 fn apply_multi_order_by(
1902 &self,
1903 mut view: DataView,
1904 order_by_columns: &[OrderByColumn],
1905 ) -> Result<DataView> {
1906 self.apply_multi_order_by_with_context(view, order_by_columns, None)
1907 }
1908
1909 fn apply_multi_order_by_with_context(
1911 &self,
1912 mut view: DataView,
1913 order_by_columns: &[OrderByColumn],
1914 exec_context: Option<&ExecutionContext>,
1915 ) -> Result<DataView> {
1916 let mut sort_columns = Vec::new();
1918
1919 for order_col in order_by_columns {
1920 let col_index = if order_col.column.contains('.') {
1922 if let Some(dot_pos) = order_col.column.rfind('.') {
1924 let col_name = &order_col.column[dot_pos + 1..];
1925
1926 debug!(
1929 "ORDER BY: Extracting unqualified column '{}' from '{}'",
1930 col_name, order_col.column
1931 );
1932 view.source().get_column_index(col_name)
1933 } else {
1934 view.source().get_column_index(&order_col.column)
1935 }
1936 } else {
1937 view.source().get_column_index(&order_col.column)
1939 }
1940 .ok_or_else(|| {
1941 let suggestion = self.find_similar_column(view.source(), &order_col.column);
1943 match suggestion {
1944 Some(similar) => anyhow::anyhow!(
1945 "Column '{}' not found. Did you mean '{}'?",
1946 order_col.column,
1947 similar
1948 ),
1949 None => {
1950 let available_cols = view.source().column_names().join(", ");
1952 anyhow::anyhow!(
1953 "Column '{}' not found. Available columns: {}",
1954 order_col.column,
1955 available_cols
1956 )
1957 }
1958 }
1959 })?;
1960
1961 let ascending = matches!(order_col.direction, SortDirection::Asc);
1962 sort_columns.push((col_index, ascending));
1963 }
1964
1965 view.apply_multi_sort(&sort_columns)?;
1967 Ok(view)
1968 }
1969
1970 fn apply_group_by(
1972 &self,
1973 view: DataView,
1974 group_by_exprs: &[SqlExpression],
1975 select_items: &[SelectItem],
1976 having: Option<&SqlExpression>,
1977 plan: &mut ExecutionPlanBuilder,
1978 ) -> Result<DataView> {
1979 let (result_view, phase_info) = self.apply_group_by_expressions(
1981 view,
1982 group_by_exprs,
1983 select_items,
1984 having,
1985 self.case_insensitive,
1986 self.date_notation.clone(),
1987 )?;
1988
1989 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
1991 plan.add_detail(format!(
1992 "Phase 1 - Group Building: {:.3}ms",
1993 phase_info.phase2_key_building.as_secs_f64() * 1000.0
1994 ));
1995 plan.add_detail(format!(
1996 " • Processing {} rows into {} groups",
1997 phase_info.total_rows, phase_info.num_groups
1998 ));
1999 plan.add_detail(format!(
2000 "Phase 2 - Aggregation: {:.3}ms",
2001 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2002 ));
2003 if phase_info.phase4_having_evaluation > Duration::ZERO {
2004 plan.add_detail(format!(
2005 "Phase 3 - HAVING Filter: {:.3}ms",
2006 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2007 ));
2008 plan.add_detail(format!(
2009 " • Filtered {} groups",
2010 phase_info.groups_filtered_by_having
2011 ));
2012 }
2013 plan.add_detail(format!(
2014 "Total GROUP BY time: {:.3}ms",
2015 phase_info.total_time.as_secs_f64() * 1000.0
2016 ));
2017
2018 Ok(result_view)
2019 }
2020
2021 pub fn estimate_group_cardinality(
2024 &self,
2025 view: &DataView,
2026 group_by_exprs: &[SqlExpression],
2027 ) -> usize {
2028 let row_count = view.get_visible_rows().len();
2030 if row_count <= 100 {
2031 return row_count;
2032 }
2033
2034 let sample_size = min(1000, row_count / 10).max(100);
2036 let mut seen = FxHashSet::default();
2037
2038 let visible_rows = view.get_visible_rows();
2039 for (i, &row_idx) in visible_rows.iter().enumerate() {
2040 if i >= sample_size {
2041 break;
2042 }
2043
2044 let mut key_values = Vec::new();
2046 for expr in group_by_exprs {
2047 let mut evaluator = ArithmeticEvaluator::new(view.source());
2048 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2049 key_values.push(value);
2050 }
2051
2052 seen.insert(key_values);
2053 }
2054
2055 let sample_cardinality = seen.len();
2057 let estimated = (sample_cardinality * row_count) / sample_size;
2058
2059 estimated.min(row_count).max(sample_cardinality)
2061 }
2062}
2063
2064#[cfg(test)]
2065mod tests {
2066 use super::*;
2067 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2068
2069 fn create_test_table() -> Arc<DataTable> {
2070 let mut table = DataTable::new("test");
2071
2072 table.add_column(DataColumn::new("id"));
2074 table.add_column(DataColumn::new("name"));
2075 table.add_column(DataColumn::new("age"));
2076
2077 table
2079 .add_row(DataRow::new(vec![
2080 DataValue::Integer(1),
2081 DataValue::String("Alice".to_string()),
2082 DataValue::Integer(30),
2083 ]))
2084 .unwrap();
2085
2086 table
2087 .add_row(DataRow::new(vec![
2088 DataValue::Integer(2),
2089 DataValue::String("Bob".to_string()),
2090 DataValue::Integer(25),
2091 ]))
2092 .unwrap();
2093
2094 table
2095 .add_row(DataRow::new(vec![
2096 DataValue::Integer(3),
2097 DataValue::String("Charlie".to_string()),
2098 DataValue::Integer(35),
2099 ]))
2100 .unwrap();
2101
2102 Arc::new(table)
2103 }
2104
2105 #[test]
2106 fn test_select_all() {
2107 let table = create_test_table();
2108 let engine = QueryEngine::new();
2109
2110 let view = engine
2111 .execute(table.clone(), "SELECT * FROM users")
2112 .unwrap();
2113 assert_eq!(view.row_count(), 3);
2114 assert_eq!(view.column_count(), 3);
2115 }
2116
2117 #[test]
2118 fn test_select_columns() {
2119 let table = create_test_table();
2120 let engine = QueryEngine::new();
2121
2122 let view = engine
2123 .execute(table.clone(), "SELECT name, age FROM users")
2124 .unwrap();
2125 assert_eq!(view.row_count(), 3);
2126 assert_eq!(view.column_count(), 2);
2127 }
2128
2129 #[test]
2130 fn test_select_with_limit() {
2131 let table = create_test_table();
2132 let engine = QueryEngine::new();
2133
2134 let view = engine
2135 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
2136 .unwrap();
2137 assert_eq!(view.row_count(), 2);
2138 }
2139
2140 #[test]
2141 fn test_type_coercion_contains() {
2142 let _ = tracing_subscriber::fmt()
2144 .with_max_level(tracing::Level::DEBUG)
2145 .try_init();
2146
2147 let mut table = DataTable::new("test");
2148 table.add_column(DataColumn::new("id"));
2149 table.add_column(DataColumn::new("status"));
2150 table.add_column(DataColumn::new("price"));
2151
2152 table
2154 .add_row(DataRow::new(vec![
2155 DataValue::Integer(1),
2156 DataValue::String("Pending".to_string()),
2157 DataValue::Float(99.99),
2158 ]))
2159 .unwrap();
2160
2161 table
2162 .add_row(DataRow::new(vec![
2163 DataValue::Integer(2),
2164 DataValue::String("Confirmed".to_string()),
2165 DataValue::Float(150.50),
2166 ]))
2167 .unwrap();
2168
2169 table
2170 .add_row(DataRow::new(vec![
2171 DataValue::Integer(3),
2172 DataValue::String("Pending".to_string()),
2173 DataValue::Float(75.00),
2174 ]))
2175 .unwrap();
2176
2177 let table = Arc::new(table);
2178 let engine = QueryEngine::new();
2179
2180 println!("\n=== Testing WHERE clause with Contains ===");
2181 println!("Table has {} rows", table.row_count());
2182 for i in 0..table.row_count() {
2183 let status = table.get_value(i, 1);
2184 println!("Row {i}: status = {status:?}");
2185 }
2186
2187 println!("\n--- Test 1: status.Contains('pend') ---");
2189 let result = engine.execute(
2190 table.clone(),
2191 "SELECT * FROM test WHERE status.Contains('pend')",
2192 );
2193 match result {
2194 Ok(view) => {
2195 println!("SUCCESS: Found {} matching rows", view.row_count());
2196 assert_eq!(view.row_count(), 2); }
2198 Err(e) => {
2199 panic!("Query failed: {e}");
2200 }
2201 }
2202
2203 println!("\n--- Test 2: price.Contains('9') ---");
2205 let result = engine.execute(
2206 table.clone(),
2207 "SELECT * FROM test WHERE price.Contains('9')",
2208 );
2209 match result {
2210 Ok(view) => {
2211 println!(
2212 "SUCCESS: Found {} matching rows with price containing '9'",
2213 view.row_count()
2214 );
2215 assert!(view.row_count() >= 1);
2217 }
2218 Err(e) => {
2219 panic!("Numeric coercion query failed: {e}");
2220 }
2221 }
2222
2223 println!("\n=== All tests passed! ===");
2224 }
2225
2226 #[test]
2227 fn test_not_in_clause() {
2228 let _ = tracing_subscriber::fmt()
2230 .with_max_level(tracing::Level::DEBUG)
2231 .try_init();
2232
2233 let mut table = DataTable::new("test");
2234 table.add_column(DataColumn::new("id"));
2235 table.add_column(DataColumn::new("country"));
2236
2237 table
2239 .add_row(DataRow::new(vec![
2240 DataValue::Integer(1),
2241 DataValue::String("CA".to_string()),
2242 ]))
2243 .unwrap();
2244
2245 table
2246 .add_row(DataRow::new(vec![
2247 DataValue::Integer(2),
2248 DataValue::String("US".to_string()),
2249 ]))
2250 .unwrap();
2251
2252 table
2253 .add_row(DataRow::new(vec![
2254 DataValue::Integer(3),
2255 DataValue::String("UK".to_string()),
2256 ]))
2257 .unwrap();
2258
2259 let table = Arc::new(table);
2260 let engine = QueryEngine::new();
2261
2262 println!("\n=== Testing NOT IN clause ===");
2263 println!("Table has {} rows", table.row_count());
2264 for i in 0..table.row_count() {
2265 let country = table.get_value(i, 1);
2266 println!("Row {i}: country = {country:?}");
2267 }
2268
2269 println!("\n--- Test: country NOT IN ('CA') ---");
2271 let result = engine.execute(
2272 table.clone(),
2273 "SELECT * FROM test WHERE country NOT IN ('CA')",
2274 );
2275 match result {
2276 Ok(view) => {
2277 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
2278 assert_eq!(view.row_count(), 2); }
2280 Err(e) => {
2281 panic!("NOT IN query failed: {e}");
2282 }
2283 }
2284
2285 println!("\n=== NOT IN test complete! ===");
2286 }
2287
2288 #[test]
2289 fn test_case_insensitive_in_and_not_in() {
2290 let _ = tracing_subscriber::fmt()
2292 .with_max_level(tracing::Level::DEBUG)
2293 .try_init();
2294
2295 let mut table = DataTable::new("test");
2296 table.add_column(DataColumn::new("id"));
2297 table.add_column(DataColumn::new("country"));
2298
2299 table
2301 .add_row(DataRow::new(vec![
2302 DataValue::Integer(1),
2303 DataValue::String("CA".to_string()), ]))
2305 .unwrap();
2306
2307 table
2308 .add_row(DataRow::new(vec![
2309 DataValue::Integer(2),
2310 DataValue::String("us".to_string()), ]))
2312 .unwrap();
2313
2314 table
2315 .add_row(DataRow::new(vec![
2316 DataValue::Integer(3),
2317 DataValue::String("UK".to_string()), ]))
2319 .unwrap();
2320
2321 let table = Arc::new(table);
2322
2323 println!("\n=== Testing Case-Insensitive IN clause ===");
2324 println!("Table has {} rows", table.row_count());
2325 for i in 0..table.row_count() {
2326 let country = table.get_value(i, 1);
2327 println!("Row {i}: country = {country:?}");
2328 }
2329
2330 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
2332 let engine = QueryEngine::with_case_insensitive(true);
2333 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2334 match result {
2335 Ok(view) => {
2336 println!(
2337 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
2338 view.row_count()
2339 );
2340 assert_eq!(view.row_count(), 1); }
2342 Err(e) => {
2343 panic!("Case-insensitive IN query failed: {e}");
2344 }
2345 }
2346
2347 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
2349 let result = engine.execute(
2350 table.clone(),
2351 "SELECT * FROM test WHERE country NOT IN ('ca')",
2352 );
2353 match result {
2354 Ok(view) => {
2355 println!(
2356 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
2357 view.row_count()
2358 );
2359 assert_eq!(view.row_count(), 2); }
2361 Err(e) => {
2362 panic!("Case-insensitive NOT IN query failed: {e}");
2363 }
2364 }
2365
2366 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
2368 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
2370 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
2371 match result {
2372 Ok(view) => {
2373 println!(
2374 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
2375 view.row_count()
2376 );
2377 assert_eq!(view.row_count(), 0); }
2379 Err(e) => {
2380 panic!("Case-sensitive IN query failed: {e}");
2381 }
2382 }
2383
2384 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
2385 }
2386
2387 #[test]
2388 #[ignore = "Parentheses in WHERE clause not yet implemented"]
2389 fn test_parentheses_in_where_clause() {
2390 let _ = tracing_subscriber::fmt()
2392 .with_max_level(tracing::Level::DEBUG)
2393 .try_init();
2394
2395 let mut table = DataTable::new("test");
2396 table.add_column(DataColumn::new("id"));
2397 table.add_column(DataColumn::new("status"));
2398 table.add_column(DataColumn::new("priority"));
2399
2400 table
2402 .add_row(DataRow::new(vec![
2403 DataValue::Integer(1),
2404 DataValue::String("Pending".to_string()),
2405 DataValue::String("High".to_string()),
2406 ]))
2407 .unwrap();
2408
2409 table
2410 .add_row(DataRow::new(vec![
2411 DataValue::Integer(2),
2412 DataValue::String("Complete".to_string()),
2413 DataValue::String("High".to_string()),
2414 ]))
2415 .unwrap();
2416
2417 table
2418 .add_row(DataRow::new(vec![
2419 DataValue::Integer(3),
2420 DataValue::String("Pending".to_string()),
2421 DataValue::String("Low".to_string()),
2422 ]))
2423 .unwrap();
2424
2425 table
2426 .add_row(DataRow::new(vec![
2427 DataValue::Integer(4),
2428 DataValue::String("Complete".to_string()),
2429 DataValue::String("Low".to_string()),
2430 ]))
2431 .unwrap();
2432
2433 let table = Arc::new(table);
2434 let engine = QueryEngine::new();
2435
2436 println!("\n=== Testing Parentheses in WHERE clause ===");
2437 println!("Table has {} rows", table.row_count());
2438 for i in 0..table.row_count() {
2439 let status = table.get_value(i, 1);
2440 let priority = table.get_value(i, 2);
2441 println!("Row {i}: status = {status:?}, priority = {priority:?}");
2442 }
2443
2444 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
2446 let result = engine.execute(
2447 table.clone(),
2448 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
2449 );
2450 match result {
2451 Ok(view) => {
2452 println!(
2453 "SUCCESS: Found {} rows with parenthetical logic",
2454 view.row_count()
2455 );
2456 assert_eq!(view.row_count(), 2); }
2458 Err(e) => {
2459 panic!("Parentheses query failed: {e}");
2460 }
2461 }
2462
2463 println!("\n=== Parentheses test complete! ===");
2464 }
2465
2466 #[test]
2467 #[ignore = "Numeric type coercion needs fixing"]
2468 fn test_numeric_type_coercion() {
2469 let _ = tracing_subscriber::fmt()
2471 .with_max_level(tracing::Level::DEBUG)
2472 .try_init();
2473
2474 let mut table = DataTable::new("test");
2475 table.add_column(DataColumn::new("id"));
2476 table.add_column(DataColumn::new("price"));
2477 table.add_column(DataColumn::new("quantity"));
2478
2479 table
2481 .add_row(DataRow::new(vec![
2482 DataValue::Integer(1),
2483 DataValue::Float(99.50), DataValue::Integer(100),
2485 ]))
2486 .unwrap();
2487
2488 table
2489 .add_row(DataRow::new(vec![
2490 DataValue::Integer(2),
2491 DataValue::Float(150.0), DataValue::Integer(200),
2493 ]))
2494 .unwrap();
2495
2496 table
2497 .add_row(DataRow::new(vec![
2498 DataValue::Integer(3),
2499 DataValue::Integer(75), DataValue::Integer(50),
2501 ]))
2502 .unwrap();
2503
2504 let table = Arc::new(table);
2505 let engine = QueryEngine::new();
2506
2507 println!("\n=== Testing Numeric Type Coercion ===");
2508 println!("Table has {} rows", table.row_count());
2509 for i in 0..table.row_count() {
2510 let price = table.get_value(i, 1);
2511 let quantity = table.get_value(i, 2);
2512 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
2513 }
2514
2515 println!("\n--- Test: price.Contains('.') ---");
2517 let result = engine.execute(
2518 table.clone(),
2519 "SELECT * FROM test WHERE price.Contains('.')",
2520 );
2521 match result {
2522 Ok(view) => {
2523 println!(
2524 "SUCCESS: Found {} rows with decimal points in price",
2525 view.row_count()
2526 );
2527 assert_eq!(view.row_count(), 2); }
2529 Err(e) => {
2530 panic!("Numeric Contains query failed: {e}");
2531 }
2532 }
2533
2534 println!("\n--- Test: quantity.Contains('0') ---");
2536 let result = engine.execute(
2537 table.clone(),
2538 "SELECT * FROM test WHERE quantity.Contains('0')",
2539 );
2540 match result {
2541 Ok(view) => {
2542 println!(
2543 "SUCCESS: Found {} rows with '0' in quantity",
2544 view.row_count()
2545 );
2546 assert_eq!(view.row_count(), 2); }
2548 Err(e) => {
2549 panic!("Integer Contains query failed: {e}");
2550 }
2551 }
2552
2553 println!("\n=== Numeric type coercion test complete! ===");
2554 }
2555
2556 #[test]
2557 fn test_datetime_comparisons() {
2558 let _ = tracing_subscriber::fmt()
2560 .with_max_level(tracing::Level::DEBUG)
2561 .try_init();
2562
2563 let mut table = DataTable::new("test");
2564 table.add_column(DataColumn::new("id"));
2565 table.add_column(DataColumn::new("created_date"));
2566
2567 table
2569 .add_row(DataRow::new(vec![
2570 DataValue::Integer(1),
2571 DataValue::String("2024-12-15".to_string()),
2572 ]))
2573 .unwrap();
2574
2575 table
2576 .add_row(DataRow::new(vec![
2577 DataValue::Integer(2),
2578 DataValue::String("2025-01-15".to_string()),
2579 ]))
2580 .unwrap();
2581
2582 table
2583 .add_row(DataRow::new(vec![
2584 DataValue::Integer(3),
2585 DataValue::String("2025-02-15".to_string()),
2586 ]))
2587 .unwrap();
2588
2589 let table = Arc::new(table);
2590 let engine = QueryEngine::new();
2591
2592 println!("\n=== Testing DateTime Comparisons ===");
2593 println!("Table has {} rows", table.row_count());
2594 for i in 0..table.row_count() {
2595 let date = table.get_value(i, 1);
2596 println!("Row {i}: created_date = {date:?}");
2597 }
2598
2599 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
2601 let result = engine.execute(
2602 table.clone(),
2603 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
2604 );
2605 match result {
2606 Ok(view) => {
2607 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
2608 assert_eq!(view.row_count(), 2); }
2610 Err(e) => {
2611 panic!("DateTime comparison query failed: {e}");
2612 }
2613 }
2614
2615 println!("\n=== DateTime comparison test complete! ===");
2616 }
2617
2618 #[test]
2619 fn test_not_with_method_calls() {
2620 let _ = tracing_subscriber::fmt()
2622 .with_max_level(tracing::Level::DEBUG)
2623 .try_init();
2624
2625 let mut table = DataTable::new("test");
2626 table.add_column(DataColumn::new("id"));
2627 table.add_column(DataColumn::new("status"));
2628
2629 table
2631 .add_row(DataRow::new(vec![
2632 DataValue::Integer(1),
2633 DataValue::String("Pending Review".to_string()),
2634 ]))
2635 .unwrap();
2636
2637 table
2638 .add_row(DataRow::new(vec![
2639 DataValue::Integer(2),
2640 DataValue::String("Complete".to_string()),
2641 ]))
2642 .unwrap();
2643
2644 table
2645 .add_row(DataRow::new(vec![
2646 DataValue::Integer(3),
2647 DataValue::String("Pending Approval".to_string()),
2648 ]))
2649 .unwrap();
2650
2651 let table = Arc::new(table);
2652 let engine = QueryEngine::with_case_insensitive(true);
2653
2654 println!("\n=== Testing NOT with Method Calls ===");
2655 println!("Table has {} rows", table.row_count());
2656 for i in 0..table.row_count() {
2657 let status = table.get_value(i, 1);
2658 println!("Row {i}: status = {status:?}");
2659 }
2660
2661 println!("\n--- Test: NOT status.Contains('pend') ---");
2663 let result = engine.execute(
2664 table.clone(),
2665 "SELECT * FROM test WHERE NOT status.Contains('pend')",
2666 );
2667 match result {
2668 Ok(view) => {
2669 println!(
2670 "SUCCESS: Found {} rows NOT containing 'pend'",
2671 view.row_count()
2672 );
2673 assert_eq!(view.row_count(), 1); }
2675 Err(e) => {
2676 panic!("NOT Contains query failed: {e}");
2677 }
2678 }
2679
2680 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
2682 let result = engine.execute(
2683 table.clone(),
2684 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
2685 );
2686 match result {
2687 Ok(view) => {
2688 println!(
2689 "SUCCESS: Found {} rows NOT starting with 'Pending'",
2690 view.row_count()
2691 );
2692 assert_eq!(view.row_count(), 1); }
2694 Err(e) => {
2695 panic!("NOT StartsWith query failed: {e}");
2696 }
2697 }
2698
2699 println!("\n=== NOT with method calls test complete! ===");
2700 }
2701
2702 #[test]
2703 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
2704 fn test_complex_logical_expressions() {
2705 let _ = tracing_subscriber::fmt()
2707 .with_max_level(tracing::Level::DEBUG)
2708 .try_init();
2709
2710 let mut table = DataTable::new("test");
2711 table.add_column(DataColumn::new("id"));
2712 table.add_column(DataColumn::new("status"));
2713 table.add_column(DataColumn::new("priority"));
2714 table.add_column(DataColumn::new("assigned"));
2715
2716 table
2718 .add_row(DataRow::new(vec![
2719 DataValue::Integer(1),
2720 DataValue::String("Pending".to_string()),
2721 DataValue::String("High".to_string()),
2722 DataValue::String("John".to_string()),
2723 ]))
2724 .unwrap();
2725
2726 table
2727 .add_row(DataRow::new(vec![
2728 DataValue::Integer(2),
2729 DataValue::String("Complete".to_string()),
2730 DataValue::String("High".to_string()),
2731 DataValue::String("Jane".to_string()),
2732 ]))
2733 .unwrap();
2734
2735 table
2736 .add_row(DataRow::new(vec![
2737 DataValue::Integer(3),
2738 DataValue::String("Pending".to_string()),
2739 DataValue::String("Low".to_string()),
2740 DataValue::String("John".to_string()),
2741 ]))
2742 .unwrap();
2743
2744 table
2745 .add_row(DataRow::new(vec![
2746 DataValue::Integer(4),
2747 DataValue::String("In Progress".to_string()),
2748 DataValue::String("Medium".to_string()),
2749 DataValue::String("Jane".to_string()),
2750 ]))
2751 .unwrap();
2752
2753 let table = Arc::new(table);
2754 let engine = QueryEngine::new();
2755
2756 println!("\n=== Testing Complex Logical Expressions ===");
2757 println!("Table has {} rows", table.row_count());
2758 for i in 0..table.row_count() {
2759 let status = table.get_value(i, 1);
2760 let priority = table.get_value(i, 2);
2761 let assigned = table.get_value(i, 3);
2762 println!(
2763 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
2764 );
2765 }
2766
2767 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
2769 let result = engine.execute(
2770 table.clone(),
2771 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
2772 );
2773 match result {
2774 Ok(view) => {
2775 println!(
2776 "SUCCESS: Found {} rows with complex logic",
2777 view.row_count()
2778 );
2779 assert_eq!(view.row_count(), 2); }
2781 Err(e) => {
2782 panic!("Complex logic query failed: {e}");
2783 }
2784 }
2785
2786 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
2788 let result = engine.execute(
2789 table.clone(),
2790 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
2791 );
2792 match result {
2793 Ok(view) => {
2794 println!(
2795 "SUCCESS: Found {} rows with NOT complex logic",
2796 view.row_count()
2797 );
2798 assert_eq!(view.row_count(), 2); }
2800 Err(e) => {
2801 panic!("NOT complex logic query failed: {e}");
2802 }
2803 }
2804
2805 println!("\n=== Complex logical expressions test complete! ===");
2806 }
2807
2808 #[test]
2809 fn test_mixed_data_types_and_edge_cases() {
2810 let _ = tracing_subscriber::fmt()
2812 .with_max_level(tracing::Level::DEBUG)
2813 .try_init();
2814
2815 let mut table = DataTable::new("test");
2816 table.add_column(DataColumn::new("id"));
2817 table.add_column(DataColumn::new("value"));
2818 table.add_column(DataColumn::new("nullable_field"));
2819
2820 table
2822 .add_row(DataRow::new(vec![
2823 DataValue::Integer(1),
2824 DataValue::String("123.45".to_string()),
2825 DataValue::String("present".to_string()),
2826 ]))
2827 .unwrap();
2828
2829 table
2830 .add_row(DataRow::new(vec![
2831 DataValue::Integer(2),
2832 DataValue::Float(678.90),
2833 DataValue::Null,
2834 ]))
2835 .unwrap();
2836
2837 table
2838 .add_row(DataRow::new(vec![
2839 DataValue::Integer(3),
2840 DataValue::Boolean(true),
2841 DataValue::String("also present".to_string()),
2842 ]))
2843 .unwrap();
2844
2845 table
2846 .add_row(DataRow::new(vec![
2847 DataValue::Integer(4),
2848 DataValue::String("false".to_string()),
2849 DataValue::Null,
2850 ]))
2851 .unwrap();
2852
2853 let table = Arc::new(table);
2854 let engine = QueryEngine::new();
2855
2856 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
2857 println!("Table has {} rows", table.row_count());
2858 for i in 0..table.row_count() {
2859 let value = table.get_value(i, 1);
2860 let nullable = table.get_value(i, 2);
2861 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
2862 }
2863
2864 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
2866 let result = engine.execute(
2867 table.clone(),
2868 "SELECT * FROM test WHERE value.Contains('true')",
2869 );
2870 match result {
2871 Ok(view) => {
2872 println!(
2873 "SUCCESS: Found {} rows with boolean coercion",
2874 view.row_count()
2875 );
2876 assert_eq!(view.row_count(), 1); }
2878 Err(e) => {
2879 panic!("Boolean coercion query failed: {e}");
2880 }
2881 }
2882
2883 println!("\n--- Test: id IN (1, 3) ---");
2885 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
2886 match result {
2887 Ok(view) => {
2888 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
2889 assert_eq!(view.row_count(), 2); }
2891 Err(e) => {
2892 panic!("Multiple IN values query failed: {e}");
2893 }
2894 }
2895
2896 println!("\n=== Mixed data types test complete! ===");
2897 }
2898
2899 #[test]
2901 fn test_aggregate_only_single_row() {
2902 let table = create_test_stock_data();
2903 let engine = QueryEngine::new();
2904
2905 let result = engine
2907 .execute(
2908 table.clone(),
2909 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
2910 )
2911 .expect("Query should succeed");
2912
2913 assert_eq!(
2914 result.row_count(),
2915 1,
2916 "Aggregate-only query should return exactly 1 row"
2917 );
2918 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
2919
2920 let source = result.source();
2922 let row = source.get_row(0).expect("Should have first row");
2923
2924 assert_eq!(row.values[0], DataValue::Integer(5));
2926
2927 assert_eq!(row.values[1], DataValue::Float(99.5));
2929
2930 assert_eq!(row.values[2], DataValue::Float(105.0));
2932
2933 if let DataValue::Float(avg) = &row.values[3] {
2935 assert!(
2936 (avg - 102.4).abs() < 0.01,
2937 "Average should be approximately 102.4, got {}",
2938 avg
2939 );
2940 } else {
2941 panic!("AVG should return a Float value");
2942 }
2943 }
2944
2945 #[test]
2947 fn test_single_aggregate_single_row() {
2948 let table = create_test_stock_data();
2949 let engine = QueryEngine::new();
2950
2951 let result = engine
2952 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
2953 .expect("Query should succeed");
2954
2955 assert_eq!(
2956 result.row_count(),
2957 1,
2958 "Single aggregate query should return exactly 1 row"
2959 );
2960 assert_eq!(result.column_count(), 1, "Should have 1 column");
2961
2962 let source = result.source();
2963 let row = source.get_row(0).expect("Should have first row");
2964 assert_eq!(row.values[0], DataValue::Integer(5));
2965 }
2966
2967 #[test]
2969 fn test_aggregate_with_where_single_row() {
2970 let table = create_test_stock_data();
2971 let engine = QueryEngine::new();
2972
2973 let result = engine
2975 .execute(
2976 table.clone(),
2977 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
2978 )
2979 .expect("Query should succeed");
2980
2981 assert_eq!(
2982 result.row_count(),
2983 1,
2984 "Filtered aggregate query should return exactly 1 row"
2985 );
2986 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
2987
2988 let source = result.source();
2989 let row = source.get_row(0).expect("Should have first row");
2990
2991 assert_eq!(row.values[0], DataValue::Integer(2));
2993 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
2996
2997 #[test]
2998 fn test_not_in_parsing() {
2999 use crate::sql::recursive_parser::Parser;
3000
3001 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3002 println!("\n=== Testing NOT IN parsing ===");
3003 println!("Parsing query: {query}");
3004
3005 let mut parser = Parser::new(query);
3006 match parser.parse() {
3007 Ok(statement) => {
3008 println!("Parsed statement: {statement:#?}");
3009 if let Some(where_clause) = statement.where_clause {
3010 println!("WHERE conditions: {:#?}", where_clause.conditions);
3011 if let Some(first_condition) = where_clause.conditions.first() {
3012 println!("First condition expression: {:#?}", first_condition.expr);
3013 }
3014 }
3015 }
3016 Err(e) => {
3017 panic!("Parse error: {e}");
3018 }
3019 }
3020 }
3021
3022 fn create_test_stock_data() -> Arc<DataTable> {
3024 let mut table = DataTable::new("stock");
3025
3026 table.add_column(DataColumn::new("symbol"));
3027 table.add_column(DataColumn::new("close"));
3028 table.add_column(DataColumn::new("volume"));
3029
3030 let test_data = vec![
3032 ("AAPL", 99.5, 1000),
3033 ("AAPL", 101.2, 1500),
3034 ("AAPL", 103.5, 2000),
3035 ("AAPL", 105.0, 1200),
3036 ("AAPL", 102.8, 1800),
3037 ];
3038
3039 for (symbol, close, volume) in test_data {
3040 table
3041 .add_row(DataRow::new(vec![
3042 DataValue::String(symbol.to_string()),
3043 DataValue::Float(close),
3044 DataValue::Integer(volume),
3045 ]))
3046 .expect("Should add row successfully");
3047 }
3048
3049 Arc::new(table)
3050 }
3051}
3052
3053#[cfg(test)]
3054#[path = "query_engine_tests.rs"]
3055mod query_engine_tests;