1use anyhow::{anyhow, Result};
2use fxhash::FxHashSet;
3use std::cmp::min;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tracing::{debug, info};
8
9use crate::config::config::BehaviorConfig;
10use crate::config::global::get_date_notation;
11use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
12use crate::data::data_view::DataView;
13use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
14use crate::data::evaluation_context::EvaluationContext;
15use crate::data::group_by_expressions::GroupByExpressions;
16use crate::data::hash_join::HashJoinExecutor;
17use crate::data::recursive_where_evaluator::RecursiveWhereEvaluator;
18use crate::data::row_expanders::RowExpanderRegistry;
19use crate::data::subquery_executor::SubqueryExecutor;
20use crate::data::temp_table_registry::TempTableRegistry;
21use crate::execution_plan::{ExecutionPlan, ExecutionPlanBuilder, StepType};
22use crate::sql::aggregates::{contains_aggregate, is_aggregate_compatible};
23use crate::sql::parser::ast::ColumnRef;
24use crate::sql::parser::ast::SetOperation;
25use crate::sql::parser::ast::TableSource;
26use crate::sql::parser::ast::WindowSpec;
27use crate::sql::recursive_parser::{
28 CTEType, OrderByItem, Parser, SelectItem, SelectStatement, SortDirection, SqlExpression,
29 TableFunction,
30};
31
32#[derive(Debug, Clone)]
34pub struct ExecutionContext {
35 alias_map: HashMap<String, String>,
38}
39
40impl ExecutionContext {
41 pub fn new() -> Self {
43 Self {
44 alias_map: HashMap::new(),
45 }
46 }
47
48 pub fn register_alias(&mut self, alias: String, table_name: String) {
50 debug!("Registering alias: {} -> {}", alias, table_name);
51 self.alias_map.insert(alias, table_name);
52 }
53
54 pub fn resolve_alias(&self, name: &str) -> String {
57 self.alias_map
58 .get(name)
59 .cloned()
60 .unwrap_or_else(|| name.to_string())
61 }
62
63 pub fn is_alias(&self, name: &str) -> bool {
65 self.alias_map.contains_key(name)
66 }
67
68 pub fn get_aliases(&self) -> HashMap<String, String> {
70 self.alias_map.clone()
71 }
72
73 pub fn resolve_column_index(&self, table: &DataTable, column_ref: &ColumnRef) -> Result<usize> {
88 if let Some(table_prefix) = &column_ref.table_prefix {
89 let actual_table = self.resolve_alias(table_prefix);
91
92 let qualified_name = format!("{}.{}", actual_table, column_ref.name);
94 if let Some(idx) = table.find_column_by_qualified_name(&qualified_name) {
95 debug!(
96 "Resolved {}.{} -> qualified column '{}' at index {}",
97 table_prefix, column_ref.name, qualified_name, idx
98 );
99 return Ok(idx);
100 }
101
102 if let Some(idx) = table.get_column_index(&column_ref.name) {
104 debug!(
105 "Resolved {}.{} -> unqualified column '{}' at index {}",
106 table_prefix, column_ref.name, column_ref.name, idx
107 );
108 return Ok(idx);
109 }
110
111 Err(anyhow!(
113 "Column '{}' not found. Table '{}' may not support qualified column names",
114 qualified_name,
115 actual_table
116 ))
117 } else {
118 if let Some(idx) = table.get_column_index(&column_ref.name) {
120 debug!(
121 "Resolved unqualified column '{}' at index {}",
122 column_ref.name, idx
123 );
124 return Ok(idx);
125 }
126
127 if column_ref.name.contains('.') {
129 if let Some(idx) = table.find_column_by_qualified_name(&column_ref.name) {
130 debug!(
131 "Resolved '{}' as qualified column at index {}",
132 column_ref.name, idx
133 );
134 return Ok(idx);
135 }
136 }
137
138 let suggestion = self.find_similar_column(table, &column_ref.name);
140 match suggestion {
141 Some(similar) => Err(anyhow!(
142 "Column '{}' not found. Did you mean '{}'?",
143 column_ref.name,
144 similar
145 )),
146 None => Err(anyhow!("Column '{}' not found", column_ref.name)),
147 }
148 }
149 }
150
151 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
153 let columns = table.column_names();
154 let mut best_match: Option<(String, usize)> = None;
155
156 for col in columns {
157 let distance = edit_distance(name, &col);
158 if distance <= 2 {
159 match best_match {
161 Some((_, best_dist)) if distance < best_dist => {
162 best_match = Some((col.clone(), distance));
163 }
164 None => {
165 best_match = Some((col.clone(), distance));
166 }
167 _ => {}
168 }
169 }
170 }
171
172 best_match.map(|(name, _)| name)
173 }
174}
175
176impl Default for ExecutionContext {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182fn edit_distance(a: &str, b: &str) -> usize {
184 let len_a = a.chars().count();
185 let len_b = b.chars().count();
186
187 if len_a == 0 {
188 return len_b;
189 }
190 if len_b == 0 {
191 return len_a;
192 }
193
194 let mut matrix = vec![vec![0; len_b + 1]; len_a + 1];
195
196 for i in 0..=len_a {
197 matrix[i][0] = i;
198 }
199 for j in 0..=len_b {
200 matrix[0][j] = j;
201 }
202
203 let a_chars: Vec<char> = a.chars().collect();
204 let b_chars: Vec<char> = b.chars().collect();
205
206 for i in 1..=len_a {
207 for j in 1..=len_b {
208 let cost = if a_chars[i - 1] == b_chars[j - 1] {
209 0
210 } else {
211 1
212 };
213 matrix[i][j] = min(
214 min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1),
215 matrix[i - 1][j - 1] + cost,
216 );
217 }
218 }
219
220 matrix[len_a][len_b]
221}
222
223#[derive(Clone)]
225pub struct QueryEngine {
226 case_insensitive: bool,
227 date_notation: String,
228 _behavior_config: Option<BehaviorConfig>,
229}
230
231impl Default for QueryEngine {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl QueryEngine {
238 #[must_use]
239 pub fn new() -> Self {
240 Self {
241 case_insensitive: false,
242 date_notation: get_date_notation(),
243 _behavior_config: None,
244 }
245 }
246
247 #[must_use]
248 pub fn with_behavior_config(config: BehaviorConfig) -> Self {
249 let case_insensitive = config.case_insensitive_default;
250 let date_notation = get_date_notation();
252 Self {
253 case_insensitive,
254 date_notation,
255 _behavior_config: Some(config),
256 }
257 }
258
259 #[must_use]
260 pub fn with_date_notation(_date_notation: String) -> Self {
261 Self {
262 case_insensitive: false,
263 date_notation: get_date_notation(), _behavior_config: None,
265 }
266 }
267
268 #[must_use]
269 pub fn with_case_insensitive(case_insensitive: bool) -> Self {
270 Self {
271 case_insensitive,
272 date_notation: get_date_notation(),
273 _behavior_config: None,
274 }
275 }
276
277 #[must_use]
278 pub fn with_case_insensitive_and_date_notation(
279 case_insensitive: bool,
280 _date_notation: String, ) -> Self {
282 Self {
283 case_insensitive,
284 date_notation: get_date_notation(), _behavior_config: None,
286 }
287 }
288
289 fn find_similar_column(&self, table: &DataTable, name: &str) -> Option<String> {
291 let columns = table.column_names();
292 let mut best_match: Option<(String, usize)> = None;
293
294 for col in columns {
295 let distance = self.edit_distance(&col.to_lowercase(), &name.to_lowercase());
296 let max_distance = if name.len() > 10 { 3 } else { 2 };
299 if distance <= max_distance {
300 match &best_match {
301 None => best_match = Some((col, distance)),
302 Some((_, best_dist)) if distance < *best_dist => {
303 best_match = Some((col, distance));
304 }
305 _ => {}
306 }
307 }
308 }
309
310 best_match.map(|(name, _)| name)
311 }
312
313 fn edit_distance(&self, s1: &str, s2: &str) -> usize {
315 let len1 = s1.len();
316 let len2 = s2.len();
317 let mut matrix = vec![vec![0; len2 + 1]; len1 + 1];
318
319 for i in 0..=len1 {
320 matrix[i][0] = i;
321 }
322 for j in 0..=len2 {
323 matrix[0][j] = j;
324 }
325
326 for (i, c1) in s1.chars().enumerate() {
327 for (j, c2) in s2.chars().enumerate() {
328 let cost = usize::from(c1 != c2);
329 matrix[i + 1][j + 1] = std::cmp::min(
330 matrix[i][j + 1] + 1, std::cmp::min(
332 matrix[i + 1][j] + 1, matrix[i][j] + cost, ),
335 );
336 }
337 }
338
339 matrix[len1][len2]
340 }
341
342 fn contains_unnest(expr: &SqlExpression) -> bool {
344 match expr {
345 SqlExpression::Unnest { .. } => true,
347 SqlExpression::FunctionCall { name, args, .. } => {
348 if name.to_uppercase() == "UNNEST" {
349 return true;
350 }
351 args.iter().any(Self::contains_unnest)
353 }
354 SqlExpression::BinaryOp { left, right, .. } => {
355 Self::contains_unnest(left) || Self::contains_unnest(right)
356 }
357 SqlExpression::Not { expr } => Self::contains_unnest(expr),
358 SqlExpression::CaseExpression {
359 when_branches,
360 else_branch,
361 } => {
362 when_branches.iter().any(|branch| {
363 Self::contains_unnest(&branch.condition)
364 || Self::contains_unnest(&branch.result)
365 }) || else_branch
366 .as_ref()
367 .map_or(false, |e| Self::contains_unnest(e))
368 }
369 SqlExpression::SimpleCaseExpression {
370 expr,
371 when_branches,
372 else_branch,
373 } => {
374 Self::contains_unnest(expr)
375 || when_branches.iter().any(|branch| {
376 Self::contains_unnest(&branch.value)
377 || Self::contains_unnest(&branch.result)
378 })
379 || else_branch
380 .as_ref()
381 .map_or(false, |e| Self::contains_unnest(e))
382 }
383 SqlExpression::InList { expr, values } => {
384 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
385 }
386 SqlExpression::NotInList { expr, values } => {
387 Self::contains_unnest(expr) || values.iter().any(Self::contains_unnest)
388 }
389 SqlExpression::Between { expr, lower, upper } => {
390 Self::contains_unnest(expr)
391 || Self::contains_unnest(lower)
392 || Self::contains_unnest(upper)
393 }
394 SqlExpression::InSubquery { expr, .. } => Self::contains_unnest(expr),
395 SqlExpression::NotInSubquery { expr, .. } => Self::contains_unnest(expr),
396 SqlExpression::ScalarSubquery { .. } => false, SqlExpression::WindowFunction { args, .. } => args.iter().any(Self::contains_unnest),
398 SqlExpression::MethodCall { args, .. } => args.iter().any(Self::contains_unnest),
399 SqlExpression::ChainedMethodCall { base, args, .. } => {
400 Self::contains_unnest(base) || args.iter().any(Self::contains_unnest)
401 }
402 _ => false,
403 }
404 }
405
406 fn collect_window_specs(expr: &SqlExpression, specs: &mut Vec<WindowSpec>) {
408 match expr {
409 SqlExpression::WindowFunction {
410 window_spec, args, ..
411 } => {
412 specs.push(window_spec.clone());
414 for arg in args {
416 Self::collect_window_specs(arg, specs);
417 }
418 }
419 SqlExpression::BinaryOp { left, right, .. } => {
420 Self::collect_window_specs(left, specs);
421 Self::collect_window_specs(right, specs);
422 }
423 SqlExpression::Not { expr } => {
424 Self::collect_window_specs(expr, specs);
425 }
426 SqlExpression::FunctionCall { args, .. } => {
427 for arg in args {
428 Self::collect_window_specs(arg, specs);
429 }
430 }
431 SqlExpression::CaseExpression {
432 when_branches,
433 else_branch,
434 } => {
435 for branch in when_branches {
436 Self::collect_window_specs(&branch.condition, specs);
437 Self::collect_window_specs(&branch.result, specs);
438 }
439 if let Some(else_expr) = else_branch {
440 Self::collect_window_specs(else_expr, specs);
441 }
442 }
443 SqlExpression::SimpleCaseExpression {
444 expr,
445 when_branches,
446 else_branch,
447 } => {
448 Self::collect_window_specs(expr, specs);
449 for branch in when_branches {
450 Self::collect_window_specs(&branch.value, specs);
451 Self::collect_window_specs(&branch.result, specs);
452 }
453 if let Some(else_expr) = else_branch {
454 Self::collect_window_specs(else_expr, specs);
455 }
456 }
457 SqlExpression::InList { expr, values, .. } => {
458 Self::collect_window_specs(expr, specs);
459 for item in values {
460 Self::collect_window_specs(item, specs);
461 }
462 }
463 SqlExpression::ChainedMethodCall { base, args, .. } => {
464 Self::collect_window_specs(base, specs);
465 for arg in args {
466 Self::collect_window_specs(arg, specs);
467 }
468 }
469 SqlExpression::Column(_)
471 | SqlExpression::NumberLiteral(_)
472 | SqlExpression::StringLiteral(_)
473 | SqlExpression::BooleanLiteral(_)
474 | SqlExpression::Null
475 | SqlExpression::DateTimeToday { .. }
476 | SqlExpression::DateTimeConstructor { .. }
477 | SqlExpression::MethodCall { .. } => {}
478 _ => {}
480 }
481 }
482
483 fn contains_window_function(expr: &SqlExpression) -> bool {
485 match expr {
486 SqlExpression::WindowFunction { .. } => true,
487 SqlExpression::BinaryOp { left, right, .. } => {
488 Self::contains_window_function(left) || Self::contains_window_function(right)
489 }
490 SqlExpression::Not { expr } => Self::contains_window_function(expr),
491 SqlExpression::FunctionCall { args, .. } => {
492 args.iter().any(Self::contains_window_function)
493 }
494 SqlExpression::CaseExpression {
495 when_branches,
496 else_branch,
497 } => {
498 when_branches.iter().any(|branch| {
499 Self::contains_window_function(&branch.condition)
500 || Self::contains_window_function(&branch.result)
501 }) || else_branch
502 .as_ref()
503 .map_or(false, |e| Self::contains_window_function(e))
504 }
505 SqlExpression::SimpleCaseExpression {
506 expr,
507 when_branches,
508 else_branch,
509 } => {
510 Self::contains_window_function(expr)
511 || when_branches.iter().any(|branch| {
512 Self::contains_window_function(&branch.value)
513 || Self::contains_window_function(&branch.result)
514 })
515 || else_branch
516 .as_ref()
517 .map_or(false, |e| Self::contains_window_function(e))
518 }
519 SqlExpression::InList { expr, values } => {
520 Self::contains_window_function(expr)
521 || values.iter().any(Self::contains_window_function)
522 }
523 SqlExpression::NotInList { expr, values } => {
524 Self::contains_window_function(expr)
525 || values.iter().any(Self::contains_window_function)
526 }
527 SqlExpression::Between { expr, lower, upper } => {
528 Self::contains_window_function(expr)
529 || Self::contains_window_function(lower)
530 || Self::contains_window_function(upper)
531 }
532 SqlExpression::InSubquery { expr, .. } => Self::contains_window_function(expr),
533 SqlExpression::NotInSubquery { expr, .. } => Self::contains_window_function(expr),
534 SqlExpression::MethodCall { args, .. } => {
535 args.iter().any(Self::contains_window_function)
536 }
537 SqlExpression::ChainedMethodCall { base, args, .. } => {
538 Self::contains_window_function(base)
539 || args.iter().any(Self::contains_window_function)
540 }
541 _ => false,
542 }
543 }
544
545 fn extract_window_specs(
547 items: &[SelectItem],
548 ) -> Vec<crate::data::batch_window_evaluator::WindowFunctionSpec> {
549 let mut specs = Vec::new();
550 for (idx, item) in items.iter().enumerate() {
551 if let SelectItem::Expression { expr, .. } = item {
552 Self::collect_window_function_specs(expr, idx, &mut specs);
553 }
554 }
555 specs
556 }
557
558 fn collect_window_function_specs(
560 expr: &SqlExpression,
561 output_column_index: usize,
562 specs: &mut Vec<crate::data::batch_window_evaluator::WindowFunctionSpec>,
563 ) {
564 match expr {
565 SqlExpression::WindowFunction {
566 name,
567 args,
568 window_spec,
569 } => {
570 specs.push(crate::data::batch_window_evaluator::WindowFunctionSpec {
571 spec: window_spec.clone(),
572 function_name: name.clone(),
573 args: args.clone(),
574 output_column_index,
575 });
576 }
577 SqlExpression::BinaryOp { left, right, .. } => {
578 Self::collect_window_function_specs(left, output_column_index, specs);
579 Self::collect_window_function_specs(right, output_column_index, specs);
580 }
581 SqlExpression::Not { expr } => {
582 Self::collect_window_function_specs(expr, output_column_index, specs);
583 }
584 SqlExpression::FunctionCall { args, .. } => {
585 for arg in args {
586 Self::collect_window_function_specs(arg, output_column_index, specs);
587 }
588 }
589 SqlExpression::CaseExpression {
590 when_branches,
591 else_branch,
592 } => {
593 for branch in when_branches {
594 Self::collect_window_function_specs(
595 &branch.condition,
596 output_column_index,
597 specs,
598 );
599 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
600 }
601 if let Some(e) = else_branch {
602 Self::collect_window_function_specs(e, output_column_index, specs);
603 }
604 }
605 SqlExpression::SimpleCaseExpression {
606 expr,
607 when_branches,
608 else_branch,
609 } => {
610 Self::collect_window_function_specs(expr, output_column_index, specs);
611 for branch in when_branches {
612 Self::collect_window_function_specs(&branch.value, output_column_index, specs);
613 Self::collect_window_function_specs(&branch.result, output_column_index, specs);
614 }
615 if let Some(e) = else_branch {
616 Self::collect_window_function_specs(e, output_column_index, specs);
617 }
618 }
619 SqlExpression::InList { expr, values } => {
620 Self::collect_window_function_specs(expr, output_column_index, specs);
621 for val in values {
622 Self::collect_window_function_specs(val, output_column_index, specs);
623 }
624 }
625 SqlExpression::NotInList { expr, values } => {
626 Self::collect_window_function_specs(expr, output_column_index, specs);
627 for val in values {
628 Self::collect_window_function_specs(val, output_column_index, specs);
629 }
630 }
631 SqlExpression::Between { expr, lower, upper } => {
632 Self::collect_window_function_specs(expr, output_column_index, specs);
633 Self::collect_window_function_specs(lower, output_column_index, specs);
634 Self::collect_window_function_specs(upper, output_column_index, specs);
635 }
636 SqlExpression::InSubquery { expr, .. } => {
637 Self::collect_window_function_specs(expr, output_column_index, specs);
638 }
639 SqlExpression::NotInSubquery { expr, .. } => {
640 Self::collect_window_function_specs(expr, output_column_index, specs);
641 }
642 SqlExpression::MethodCall { args, .. } => {
643 for arg in args {
644 Self::collect_window_function_specs(arg, output_column_index, specs);
645 }
646 }
647 SqlExpression::ChainedMethodCall { base, args, .. } => {
648 Self::collect_window_function_specs(base, output_column_index, specs);
649 for arg in args {
650 Self::collect_window_function_specs(arg, output_column_index, specs);
651 }
652 }
653 _ => {} }
655 }
656
657 pub fn execute(&self, table: Arc<DataTable>, sql: &str) -> Result<DataView> {
659 let (view, _plan) = self.execute_with_plan(table, sql)?;
660 Ok(view)
661 }
662
663 pub fn execute_with_temp_tables(
665 &self,
666 table: Arc<DataTable>,
667 sql: &str,
668 temp_tables: Option<&TempTableRegistry>,
669 ) -> Result<DataView> {
670 let (view, _plan) = self.execute_with_plan_and_temp_tables(table, sql, temp_tables)?;
671 Ok(view)
672 }
673
674 pub fn execute_statement(
676 &self,
677 table: Arc<DataTable>,
678 statement: SelectStatement,
679 ) -> Result<DataView> {
680 self.execute_statement_with_temp_tables(table, statement, None)
681 }
682
683 pub fn execute_statement_with_temp_tables(
685 &self,
686 table: Arc<DataTable>,
687 statement: SelectStatement,
688 temp_tables: Option<&TempTableRegistry>,
689 ) -> Result<DataView> {
690 let mut cte_context = HashMap::new();
692
693 if let Some(temp_registry) = temp_tables {
695 for table_name in temp_registry.list_tables() {
696 if let Some(temp_table) = temp_registry.get(&table_name) {
697 debug!("Adding temp table {} to CTE context", table_name);
698 let view = DataView::new(temp_table);
699 cte_context.insert(table_name, Arc::new(view));
700 }
701 }
702 }
703
704 for cte in &statement.ctes {
705 debug!("QueryEngine: Pre-processing CTE '{}'...", cte.name);
706 let cte_result = match &cte.cte_type {
708 CTEType::Standard(query) => {
709 let view = self.build_view_with_context(
711 table.clone(),
712 query.clone(),
713 &mut cte_context,
714 )?;
715
716 let mut materialized = self.materialize_view(view)?;
718
719 for column in materialized.columns_mut() {
721 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
722 column.source_table = Some(cte.name.clone());
723 }
724
725 DataView::new(Arc::new(materialized))
726 }
727 CTEType::Web(web_spec) => {
728 use crate::web::http_fetcher::WebDataFetcher;
730
731 let fetcher = WebDataFetcher::new()?;
732 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
734
735 for column in data_table.columns_mut() {
737 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
738 column.source_table = Some(cte.name.clone());
739 }
740
741 DataView::new(Arc::new(data_table))
743 }
744 };
745 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
747 debug!(
748 "QueryEngine: CTE '{}' pre-processed, stored in context",
749 cte.name
750 );
751 }
752
753 let mut subquery_executor =
755 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
756 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
757
758 self.build_view_with_context(table, processed_statement, &mut cte_context)
760 }
761
762 pub fn execute_statement_with_cte_context(
764 &self,
765 table: Arc<DataTable>,
766 statement: SelectStatement,
767 cte_context: &HashMap<String, Arc<DataView>>,
768 ) -> Result<DataView> {
769 let mut local_context = cte_context.clone();
771
772 for cte in &statement.ctes {
774 debug!("QueryEngine: Processing nested CTE '{}'...", cte.name);
775 let cte_result = match &cte.cte_type {
776 CTEType::Standard(query) => {
777 let view = self.build_view_with_context(
778 table.clone(),
779 query.clone(),
780 &mut local_context,
781 )?;
782
783 let mut materialized = self.materialize_view(view)?;
785
786 for column in materialized.columns_mut() {
788 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
789 column.source_table = Some(cte.name.clone());
790 }
791
792 DataView::new(Arc::new(materialized))
793 }
794 CTEType::Web(web_spec) => {
795 use crate::web::http_fetcher::WebDataFetcher;
797
798 let fetcher = WebDataFetcher::new()?;
799 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
801
802 for column in data_table.columns_mut() {
804 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
805 column.source_table = Some(cte.name.clone());
806 }
807
808 DataView::new(Arc::new(data_table))
810 }
811 };
812 local_context.insert(cte.name.clone(), Arc::new(cte_result));
813 }
814
815 let mut subquery_executor =
817 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), local_context.clone());
818 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
819
820 self.build_view_with_context(table, processed_statement, &mut local_context)
822 }
823
824 pub fn execute_with_plan(
826 &self,
827 table: Arc<DataTable>,
828 sql: &str,
829 ) -> Result<(DataView, ExecutionPlan)> {
830 self.execute_with_plan_and_temp_tables(table, sql, None)
831 }
832
833 pub fn execute_with_plan_and_temp_tables(
835 &self,
836 table: Arc<DataTable>,
837 sql: &str,
838 temp_tables: Option<&TempTableRegistry>,
839 ) -> Result<(DataView, ExecutionPlan)> {
840 let mut plan_builder = ExecutionPlanBuilder::new();
841 let start_time = Instant::now();
842
843 plan_builder.begin_step(StepType::Parse, "Parse SQL query".to_string());
845 plan_builder.add_detail(format!("Query: {}", sql));
846 let mut parser = Parser::new(sql);
847 let statement = parser
848 .parse()
849 .map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
850 plan_builder.add_detail(format!("Parsed successfully"));
851 if let Some(from) = &statement.from_table {
852 plan_builder.add_detail(format!("FROM: {}", from));
853 }
854 if statement.where_clause.is_some() {
855 plan_builder.add_detail("WHERE clause present".to_string());
856 }
857 plan_builder.end_step();
858
859 let mut cte_context = HashMap::new();
861
862 if let Some(temp_registry) = temp_tables {
864 for table_name in temp_registry.list_tables() {
865 if let Some(temp_table) = temp_registry.get(&table_name) {
866 debug!("Adding temp table {} to CTE context", table_name);
867 let view = DataView::new(temp_table);
868 cte_context.insert(table_name, Arc::new(view));
869 }
870 }
871 }
872
873 if !statement.ctes.is_empty() {
874 plan_builder.begin_step(
875 StepType::CTE,
876 format!("Process {} CTEs", statement.ctes.len()),
877 );
878
879 for cte in &statement.ctes {
880 let cte_start = Instant::now();
881 plan_builder.begin_step(StepType::CTE, format!("CTE '{}'", cte.name));
882
883 let cte_result = match &cte.cte_type {
884 CTEType::Standard(query) => {
885 if let Some(from) = &query.from_table {
887 plan_builder.add_detail(format!("Source: {}", from));
888 }
889 if query.where_clause.is_some() {
890 plan_builder.add_detail("Has WHERE clause".to_string());
891 }
892 if query.group_by.is_some() {
893 plan_builder.add_detail("Has GROUP BY".to_string());
894 }
895
896 debug!(
897 "QueryEngine: Processing CTE '{}' with existing context: {:?}",
898 cte.name,
899 cte_context.keys().collect::<Vec<_>>()
900 );
901
902 let mut subquery_executor = SubqueryExecutor::with_cte_context(
905 self.clone(),
906 table.clone(),
907 cte_context.clone(),
908 );
909 let processed_query = subquery_executor.execute_subqueries(query)?;
910
911 let view = self.build_view_with_context(
912 table.clone(),
913 processed_query,
914 &mut cte_context,
915 )?;
916
917 let mut materialized = self.materialize_view(view)?;
919
920 for column in materialized.columns_mut() {
922 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
923 column.source_table = Some(cte.name.clone());
924 }
925
926 DataView::new(Arc::new(materialized))
927 }
928 CTEType::Web(web_spec) => {
929 plan_builder.add_detail(format!("URL: {}", web_spec.url));
930 if let Some(format) = &web_spec.format {
931 plan_builder.add_detail(format!("Format: {:?}", format));
932 }
933 if let Some(cache) = web_spec.cache_seconds {
934 plan_builder.add_detail(format!("Cache: {} seconds", cache));
935 }
936
937 use crate::web::http_fetcher::WebDataFetcher;
939
940 let fetcher = WebDataFetcher::new()?;
941 let mut data_table = fetcher.fetch(web_spec, &cte.name, None)?;
943
944 for column in data_table.columns_mut() {
946 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
947 column.source_table = Some(cte.name.clone());
948 }
949
950 DataView::new(Arc::new(data_table))
952 }
953 };
954
955 plan_builder.set_rows_out(cte_result.row_count());
957 plan_builder.add_detail(format!(
958 "Result: {} rows, {} columns",
959 cte_result.row_count(),
960 cte_result.column_count()
961 ));
962 plan_builder.add_detail(format!(
963 "Execution time: {:.3}ms",
964 cte_start.elapsed().as_secs_f64() * 1000.0
965 ));
966
967 debug!(
968 "QueryEngine: Storing CTE '{}' in context with {} rows",
969 cte.name,
970 cte_result.row_count()
971 );
972 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
973 plan_builder.end_step();
974 }
975
976 plan_builder.add_detail(format!(
977 "All {} CTEs cached in context",
978 statement.ctes.len()
979 ));
980 plan_builder.end_step();
981 }
982
983 plan_builder.begin_step(StepType::Subquery, "Process subqueries".to_string());
985 let mut subquery_executor =
986 SubqueryExecutor::with_cte_context(self.clone(), table.clone(), cte_context.clone());
987
988 let has_subqueries = statement.where_clause.as_ref().map_or(false, |w| {
990 format!("{:?}", w).contains("Subquery")
992 });
993
994 if has_subqueries {
995 plan_builder.add_detail("Evaluating subqueries in WHERE clause".to_string());
996 }
997
998 let processed_statement = subquery_executor.execute_subqueries(&statement)?;
999
1000 if has_subqueries {
1001 plan_builder.add_detail("Subqueries replaced with materialized values".to_string());
1002 } else {
1003 plan_builder.add_detail("No subqueries to process".to_string());
1004 }
1005
1006 plan_builder.end_step();
1007 let result = self.build_view_with_context_and_plan(
1008 table,
1009 processed_statement,
1010 &mut cte_context,
1011 &mut plan_builder,
1012 )?;
1013
1014 let total_duration = start_time.elapsed();
1015 info!(
1016 "Query execution complete: total={:?}, rows={}",
1017 total_duration,
1018 result.row_count()
1019 );
1020
1021 let plan = plan_builder.build();
1022 Ok((result, plan))
1023 }
1024
1025 fn build_view(&self, table: Arc<DataTable>, statement: SelectStatement) -> Result<DataView> {
1027 let mut cte_context = HashMap::new();
1028 self.build_view_with_context(table, statement, &mut cte_context)
1029 }
1030
1031 fn build_view_with_context(
1033 &self,
1034 table: Arc<DataTable>,
1035 statement: SelectStatement,
1036 cte_context: &mut HashMap<String, Arc<DataView>>,
1037 ) -> Result<DataView> {
1038 let mut dummy_plan = ExecutionPlanBuilder::new();
1039 let mut exec_context = ExecutionContext::new();
1040 self.build_view_with_context_and_plan_and_exec(
1041 table,
1042 statement,
1043 cte_context,
1044 &mut dummy_plan,
1045 &mut exec_context,
1046 )
1047 }
1048
1049 fn build_view_with_context_and_plan(
1051 &self,
1052 table: Arc<DataTable>,
1053 statement: SelectStatement,
1054 cte_context: &mut HashMap<String, Arc<DataView>>,
1055 plan: &mut ExecutionPlanBuilder,
1056 ) -> Result<DataView> {
1057 let mut exec_context = ExecutionContext::new();
1058 self.build_view_with_context_and_plan_and_exec(
1059 table,
1060 statement,
1061 cte_context,
1062 plan,
1063 &mut exec_context,
1064 )
1065 }
1066
1067 fn build_view_with_context_and_plan_and_exec(
1069 &self,
1070 table: Arc<DataTable>,
1071 statement: SelectStatement,
1072 cte_context: &mut HashMap<String, Arc<DataView>>,
1073 plan: &mut ExecutionPlanBuilder,
1074 exec_context: &mut ExecutionContext,
1075 ) -> Result<DataView> {
1076 for cte in &statement.ctes {
1078 if cte_context.contains_key(&cte.name) {
1080 debug!(
1081 "QueryEngine: CTE '{}' already in context, skipping",
1082 cte.name
1083 );
1084 continue;
1085 }
1086
1087 debug!("QueryEngine: Processing CTE '{}'...", cte.name);
1088 debug!(
1089 "QueryEngine: Available CTEs for '{}': {:?}",
1090 cte.name,
1091 cte_context.keys().collect::<Vec<_>>()
1092 );
1093
1094 let cte_result = match &cte.cte_type {
1096 CTEType::Standard(query) => {
1097 let view =
1098 self.build_view_with_context(table.clone(), query.clone(), cte_context)?;
1099
1100 let mut materialized = self.materialize_view(view)?;
1102
1103 for column in materialized.columns_mut() {
1105 column.qualified_name = Some(format!("{}.{}", cte.name, column.name));
1106 column.source_table = Some(cte.name.clone());
1107 }
1108
1109 DataView::new(Arc::new(materialized))
1110 }
1111 CTEType::Web(_web_spec) => {
1112 return Err(anyhow!(
1114 "Web CTEs should be processed in execute_select method"
1115 ));
1116 }
1117 };
1118
1119 cte_context.insert(cte.name.clone(), Arc::new(cte_result));
1121 debug!(
1122 "QueryEngine: CTE '{}' processed, stored in context",
1123 cte.name
1124 );
1125 }
1126
1127 let source_table = if let Some(ref table_func) = statement.from_function {
1129 debug!("QueryEngine: Processing table function...");
1131 match table_func {
1132 TableFunction::Generator { name, args } => {
1133 use crate::sql::generators::GeneratorRegistry;
1135
1136 let registry = GeneratorRegistry::new();
1138
1139 if let Some(generator) = registry.get(name) {
1140 let mut evaluator = ArithmeticEvaluator::with_date_notation(
1142 &table,
1143 self.date_notation.clone(),
1144 );
1145 let dummy_row = 0;
1146
1147 let mut evaluated_args = Vec::new();
1148 for arg in args {
1149 evaluated_args.push(evaluator.evaluate(arg, dummy_row)?);
1150 }
1151
1152 generator.generate(evaluated_args)?
1154 } else {
1155 return Err(anyhow!("Unknown generator function: {}", name));
1156 }
1157 }
1158 }
1159 } else if let Some(ref subquery) = statement.from_subquery {
1160 debug!("QueryEngine: Processing FROM subquery...");
1162 let subquery_result =
1163 self.build_view_with_context(table.clone(), *subquery.clone(), cte_context)?;
1164
1165 let materialized = self.materialize_view(subquery_result)?;
1168 Arc::new(materialized)
1169 } else if let Some(ref table_name) = statement.from_table {
1170 if let Some(cte_view) = cte_context.get(table_name) {
1172 debug!("QueryEngine: Using CTE '{}' as source table", table_name);
1173 let mut materialized = self.materialize_view((**cte_view).clone())?;
1175
1176 if let Some(ref alias) = statement.from_alias {
1178 debug!(
1179 "QueryEngine: Applying alias '{}' to CTE '{}' qualified column names",
1180 alias, table_name
1181 );
1182 for column in materialized.columns_mut() {
1183 if let Some(ref qualified_name) = column.qualified_name {
1185 if qualified_name.starts_with(&format!("{}.", table_name)) {
1186 column.qualified_name =
1187 Some(qualified_name.replace(
1188 &format!("{}.", table_name),
1189 &format!("{}.", alias),
1190 ));
1191 }
1192 }
1193 if column.source_table.as_ref() == Some(table_name) {
1195 column.source_table = Some(alias.clone());
1196 }
1197 }
1198 }
1199
1200 Arc::new(materialized)
1201 } else {
1202 table.clone()
1204 }
1205 } else {
1206 table.clone()
1208 };
1209
1210 if let Some(ref alias) = statement.from_alias {
1212 if let Some(ref table_name) = statement.from_table {
1213 exec_context.register_alias(alias.clone(), table_name.clone());
1214 }
1215 }
1216
1217 let final_table = if !statement.joins.is_empty() {
1219 plan.begin_step(
1220 StepType::Join,
1221 format!("Process {} JOINs", statement.joins.len()),
1222 );
1223 plan.set_rows_in(source_table.row_count());
1224
1225 let join_executor = HashJoinExecutor::new(self.case_insensitive);
1226 let mut current_table = source_table;
1227
1228 for (idx, join_clause) in statement.joins.iter().enumerate() {
1229 let join_start = Instant::now();
1230 plan.begin_step(StepType::Join, format!("JOIN #{}", idx + 1));
1231 plan.add_detail(format!("Type: {:?}", join_clause.join_type));
1232 plan.add_detail(format!("Left table: {} rows", current_table.row_count()));
1233 plan.add_detail(format!(
1234 "Executing {:?} JOIN on {} condition(s)",
1235 join_clause.join_type,
1236 join_clause.condition.conditions.len()
1237 ));
1238
1239 let right_table = match &join_clause.table {
1241 TableSource::Table(name) => {
1242 if let Some(cte_view) = cte_context.get(name) {
1244 let mut materialized = self.materialize_view((**cte_view).clone())?;
1245
1246 if let Some(ref alias) = join_clause.alias {
1248 debug!("QueryEngine: Applying JOIN alias '{}' to CTE '{}' qualified column names", alias, name);
1249 for column in materialized.columns_mut() {
1250 if let Some(ref qualified_name) = column.qualified_name {
1252 if qualified_name.starts_with(&format!("{}.", name)) {
1253 column.qualified_name = Some(qualified_name.replace(
1254 &format!("{}.", name),
1255 &format!("{}.", alias),
1256 ));
1257 }
1258 }
1259 if column.source_table.as_ref() == Some(name) {
1261 column.source_table = Some(alias.clone());
1262 }
1263 }
1264 }
1265
1266 Arc::new(materialized)
1267 } else {
1268 return Err(anyhow!("Cannot resolve table '{}' for JOIN", name));
1271 }
1272 }
1273 TableSource::DerivedTable { query, alias: _ } => {
1274 let subquery_result = self.build_view_with_context(
1276 table.clone(),
1277 *query.clone(),
1278 cte_context,
1279 )?;
1280 let materialized = self.materialize_view(subquery_result)?;
1281 Arc::new(materialized)
1282 }
1283 };
1284
1285 let joined = join_executor.execute_join(
1287 current_table.clone(),
1288 join_clause,
1289 right_table.clone(),
1290 )?;
1291
1292 plan.add_detail(format!("Right table: {} rows", right_table.row_count()));
1293 plan.set_rows_out(joined.row_count());
1294 plan.add_detail(format!("Result: {} rows", joined.row_count()));
1295 plan.add_detail(format!(
1296 "Join time: {:.3}ms",
1297 join_start.elapsed().as_secs_f64() * 1000.0
1298 ));
1299 plan.end_step();
1300
1301 current_table = Arc::new(joined);
1302 }
1303
1304 plan.set_rows_out(current_table.row_count());
1305 plan.add_detail(format!(
1306 "Final result after all joins: {} rows",
1307 current_table.row_count()
1308 ));
1309 plan.end_step();
1310 current_table
1311 } else {
1312 source_table
1313 };
1314
1315 self.build_view_internal_with_plan_and_exec(
1317 final_table,
1318 statement,
1319 plan,
1320 Some(exec_context),
1321 )
1322 }
1323
1324 pub fn materialize_view(&self, view: DataView) -> Result<DataTable> {
1326 let source = view.source();
1327 let mut result_table = DataTable::new("derived");
1328
1329 let visible_cols = view.visible_column_indices().to_vec();
1331
1332 for col_idx in &visible_cols {
1334 let col = &source.columns[*col_idx];
1335 let new_col = DataColumn {
1336 name: col.name.clone(),
1337 data_type: col.data_type.clone(),
1338 nullable: col.nullable,
1339 unique_values: col.unique_values,
1340 null_count: col.null_count,
1341 metadata: col.metadata.clone(),
1342 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), };
1345 result_table.add_column(new_col);
1346 }
1347
1348 for row_idx in view.visible_row_indices() {
1350 let source_row = &source.rows[*row_idx];
1351 let mut new_row = DataRow { values: Vec::new() };
1352
1353 for col_idx in &visible_cols {
1354 new_row.values.push(source_row.values[*col_idx].clone());
1355 }
1356
1357 result_table.add_row(new_row);
1358 }
1359
1360 Ok(result_table)
1361 }
1362
1363 fn build_view_internal(
1364 &self,
1365 table: Arc<DataTable>,
1366 statement: SelectStatement,
1367 ) -> Result<DataView> {
1368 let mut dummy_plan = ExecutionPlanBuilder::new();
1369 self.build_view_internal_with_plan(table, statement, &mut dummy_plan)
1370 }
1371
1372 fn build_view_internal_with_plan(
1373 &self,
1374 table: Arc<DataTable>,
1375 statement: SelectStatement,
1376 plan: &mut ExecutionPlanBuilder,
1377 ) -> Result<DataView> {
1378 self.build_view_internal_with_plan_and_exec(table, statement, plan, None)
1379 }
1380
1381 fn build_view_internal_with_plan_and_exec(
1382 &self,
1383 table: Arc<DataTable>,
1384 statement: SelectStatement,
1385 plan: &mut ExecutionPlanBuilder,
1386 exec_context: Option<&ExecutionContext>,
1387 ) -> Result<DataView> {
1388 debug!(
1389 "QueryEngine::build_view - select_items: {:?}",
1390 statement.select_items
1391 );
1392 debug!(
1393 "QueryEngine::build_view - where_clause: {:?}",
1394 statement.where_clause
1395 );
1396
1397 let mut visible_rows: Vec<usize> = (0..table.row_count()).collect();
1399
1400 if let Some(where_clause) = &statement.where_clause {
1402 let total_rows = table.row_count();
1403 debug!("QueryEngine: Applying WHERE clause to {} rows", total_rows);
1404 debug!("QueryEngine: WHERE clause = {:?}", where_clause);
1405
1406 plan.begin_step(StepType::Filter, "WHERE clause filtering".to_string());
1407 plan.set_rows_in(total_rows);
1408 plan.add_detail(format!("Input: {} rows", total_rows));
1409
1410 for condition in &where_clause.conditions {
1412 plan.add_detail(format!("Condition: {:?}", condition.expr));
1413 }
1414
1415 let filter_start = Instant::now();
1416 let mut eval_context = EvaluationContext::new(self.case_insensitive);
1418
1419 let mut evaluator = if let Some(exec_ctx) = exec_context {
1421 RecursiveWhereEvaluator::with_both_contexts(&table, &mut eval_context, exec_ctx)
1423 } else {
1424 RecursiveWhereEvaluator::with_context(&table, &mut eval_context)
1425 };
1426
1427 let mut filtered_rows = Vec::new();
1429 for row_idx in visible_rows {
1430 if row_idx < 3 {
1432 debug!("QueryEngine: Evaluating WHERE clause for row {}", row_idx);
1433 }
1434
1435 match evaluator.evaluate(where_clause, row_idx) {
1436 Ok(result) => {
1437 if row_idx < 3 {
1438 debug!("QueryEngine: Row {} WHERE result: {}", row_idx, result);
1439 }
1440 if result {
1441 filtered_rows.push(row_idx);
1442 }
1443 }
1444 Err(e) => {
1445 if row_idx < 3 {
1446 debug!(
1447 "QueryEngine: WHERE evaluation error for row {}: {}",
1448 row_idx, e
1449 );
1450 }
1451 return Err(e);
1453 }
1454 }
1455 }
1456
1457 let (compilations, cache_hits) = eval_context.get_stats();
1459 if compilations > 0 || cache_hits > 0 {
1460 debug!(
1461 "LIKE pattern cache: {} compilations, {} cache hits",
1462 compilations, cache_hits
1463 );
1464 }
1465 visible_rows = filtered_rows;
1466 let filter_duration = filter_start.elapsed();
1467 info!(
1468 "WHERE clause filtering: {} rows -> {} rows in {:?}",
1469 total_rows,
1470 visible_rows.len(),
1471 filter_duration
1472 );
1473
1474 plan.set_rows_out(visible_rows.len());
1475 plan.add_detail(format!("Output: {} rows", visible_rows.len()));
1476 plan.add_detail(format!(
1477 "Filter time: {:.3}ms",
1478 filter_duration.as_secs_f64() * 1000.0
1479 ));
1480 plan.end_step();
1481 }
1482
1483 let mut view = DataView::new(table.clone());
1485 view = view.with_rows(visible_rows);
1486
1487 if let Some(group_by_exprs) = &statement.group_by {
1489 if !group_by_exprs.is_empty() {
1490 debug!("QueryEngine: Processing GROUP BY: {:?}", group_by_exprs);
1491
1492 plan.begin_step(
1493 StepType::GroupBy,
1494 format!("GROUP BY {} expressions", group_by_exprs.len()),
1495 );
1496 plan.set_rows_in(view.row_count());
1497 plan.add_detail(format!("Input: {} rows", view.row_count()));
1498 for expr in group_by_exprs {
1499 plan.add_detail(format!("Group by: {:?}", expr));
1500 }
1501
1502 let group_start = Instant::now();
1503 view = self.apply_group_by(
1504 view,
1505 group_by_exprs,
1506 &statement.select_items,
1507 statement.having.as_ref(),
1508 plan,
1509 )?;
1510
1511 plan.set_rows_out(view.row_count());
1512 plan.add_detail(format!("Output: {} groups", view.row_count()));
1513 plan.add_detail(format!(
1514 "Overall time: {:.3}ms",
1515 group_start.elapsed().as_secs_f64() * 1000.0
1516 ));
1517 plan.end_step();
1518 }
1519 } else {
1520 if !statement.select_items.is_empty() {
1522 let has_non_star_items = statement
1524 .select_items
1525 .iter()
1526 .any(|item| !matches!(item, SelectItem::Star { .. }));
1527
1528 if has_non_star_items || statement.select_items.len() > 1 {
1532 view = self.apply_select_items(
1533 view,
1534 &statement.select_items,
1535 &statement,
1536 exec_context,
1537 plan,
1538 )?;
1539 }
1540 } else if !statement.columns.is_empty() && statement.columns[0] != "*" {
1542 debug!("QueryEngine: Using legacy columns path");
1543 let source_table = view.source();
1546 let column_indices =
1547 self.resolve_column_indices(source_table, &statement.columns)?;
1548 view = view.with_columns(column_indices);
1549 }
1550 }
1551
1552 if statement.distinct {
1554 plan.begin_step(StepType::Distinct, "Remove duplicate rows".to_string());
1555 plan.set_rows_in(view.row_count());
1556 plan.add_detail(format!("Input: {} rows", view.row_count()));
1557
1558 let distinct_start = Instant::now();
1559 view = self.apply_distinct(view)?;
1560
1561 plan.set_rows_out(view.row_count());
1562 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1563 plan.add_detail(format!(
1564 "Distinct time: {:.3}ms",
1565 distinct_start.elapsed().as_secs_f64() * 1000.0
1566 ));
1567 plan.end_step();
1568 }
1569
1570 if let Some(order_by_columns) = &statement.order_by {
1572 if !order_by_columns.is_empty() {
1573 plan.begin_step(
1574 StepType::Sort,
1575 format!("ORDER BY {} columns", order_by_columns.len()),
1576 );
1577 plan.set_rows_in(view.row_count());
1578 for col in order_by_columns {
1579 let expr_str = match &col.expr {
1581 SqlExpression::Column(col_ref) => col_ref.name.clone(),
1582 _ => "expr".to_string(),
1583 };
1584 plan.add_detail(format!("{} {:?}", expr_str, col.direction));
1585 }
1586
1587 let sort_start = Instant::now();
1588 view =
1589 self.apply_multi_order_by_with_context(view, order_by_columns, exec_context)?;
1590
1591 plan.add_detail(format!(
1592 "Sort time: {:.3}ms",
1593 sort_start.elapsed().as_secs_f64() * 1000.0
1594 ));
1595 plan.end_step();
1596 }
1597 }
1598
1599 if let Some(limit) = statement.limit {
1601 let offset = statement.offset.unwrap_or(0);
1602 plan.begin_step(StepType::Limit, format!("LIMIT {}", limit));
1603 plan.set_rows_in(view.row_count());
1604 if offset > 0 {
1605 plan.add_detail(format!("OFFSET: {}", offset));
1606 }
1607 view = view.with_limit(limit, offset);
1608 plan.set_rows_out(view.row_count());
1609 plan.add_detail(format!("Output: {} rows", view.row_count()));
1610 plan.end_step();
1611 }
1612
1613 if !statement.set_operations.is_empty() {
1615 plan.begin_step(
1616 StepType::SetOperation,
1617 format!("Process {} set operations", statement.set_operations.len()),
1618 );
1619 plan.set_rows_in(view.row_count());
1620
1621 let mut combined_table = self.materialize_view(view)?;
1623 let first_columns = combined_table.column_names();
1624 let first_column_count = first_columns.len();
1625
1626 let mut needs_deduplication = false;
1628
1629 for (idx, (operation, next_statement)) in statement.set_operations.iter().enumerate() {
1631 let op_start = Instant::now();
1632 plan.begin_step(
1633 StepType::SetOperation,
1634 format!("{:?} operation #{}", operation, idx + 1),
1635 );
1636
1637 let next_view = if let Some(exec_ctx) = exec_context {
1640 self.build_view_internal_with_plan_and_exec(
1641 table.clone(),
1642 *next_statement.clone(),
1643 plan,
1644 Some(exec_ctx),
1645 )?
1646 } else {
1647 self.build_view_internal_with_plan(
1648 table.clone(),
1649 *next_statement.clone(),
1650 plan,
1651 )?
1652 };
1653
1654 let next_table = self.materialize_view(next_view)?;
1656 let next_columns = next_table.column_names();
1657 let next_column_count = next_columns.len();
1658
1659 if first_column_count != next_column_count {
1661 return Err(anyhow!(
1662 "UNION queries must have the same number of columns: first query has {} columns, but query #{} has {} columns",
1663 first_column_count,
1664 idx + 2,
1665 next_column_count
1666 ));
1667 }
1668
1669 for (col_idx, (first_col, next_col)) in
1671 first_columns.iter().zip(next_columns.iter()).enumerate()
1672 {
1673 if !first_col.eq_ignore_ascii_case(next_col) {
1674 debug!(
1675 "UNION column name mismatch at position {}: '{}' vs '{}' (using first query's name)",
1676 col_idx + 1,
1677 first_col,
1678 next_col
1679 );
1680 }
1681 }
1682
1683 plan.add_detail(format!("Left: {} rows", combined_table.row_count()));
1684 plan.add_detail(format!("Right: {} rows", next_table.row_count()));
1685
1686 match operation {
1688 SetOperation::UnionAll => {
1689 for row in next_table.rows.iter() {
1691 combined_table.add_row(row.clone());
1692 }
1693 plan.add_detail(format!(
1694 "Result: {} rows (no deduplication)",
1695 combined_table.row_count()
1696 ));
1697 }
1698 SetOperation::Union => {
1699 for row in next_table.rows.iter() {
1701 combined_table.add_row(row.clone());
1702 }
1703 needs_deduplication = true;
1704 plan.add_detail(format!(
1705 "Combined: {} rows (deduplication pending)",
1706 combined_table.row_count()
1707 ));
1708 }
1709 SetOperation::Intersect => {
1710 return Err(anyhow!("INTERSECT is not yet implemented"));
1713 }
1714 SetOperation::Except => {
1715 return Err(anyhow!("EXCEPT is not yet implemented"));
1718 }
1719 }
1720
1721 plan.add_detail(format!(
1722 "Operation time: {:.3}ms",
1723 op_start.elapsed().as_secs_f64() * 1000.0
1724 ));
1725 plan.set_rows_out(combined_table.row_count());
1726 plan.end_step();
1727 }
1728
1729 plan.set_rows_out(combined_table.row_count());
1730 plan.add_detail(format!(
1731 "Combined result: {} rows after {} operations",
1732 combined_table.row_count(),
1733 statement.set_operations.len()
1734 ));
1735 plan.end_step();
1736
1737 view = DataView::new(Arc::new(combined_table));
1739
1740 if needs_deduplication {
1742 plan.begin_step(
1743 StepType::Distinct,
1744 "UNION deduplication - remove duplicate rows".to_string(),
1745 );
1746 plan.set_rows_in(view.row_count());
1747 plan.add_detail(format!("Input: {} rows", view.row_count()));
1748
1749 let distinct_start = Instant::now();
1750 view = self.apply_distinct(view)?;
1751
1752 plan.set_rows_out(view.row_count());
1753 plan.add_detail(format!("Output: {} unique rows", view.row_count()));
1754 plan.add_detail(format!(
1755 "Deduplication time: {:.3}ms",
1756 distinct_start.elapsed().as_secs_f64() * 1000.0
1757 ));
1758 plan.end_step();
1759 }
1760 }
1761
1762 Ok(view)
1763 }
1764
1765 fn resolve_column_indices(&self, table: &DataTable, columns: &[String]) -> Result<Vec<usize>> {
1767 let mut indices = Vec::new();
1768 let table_columns = table.column_names();
1769
1770 for col_name in columns {
1771 let index = table_columns
1772 .iter()
1773 .position(|c| c.eq_ignore_ascii_case(col_name))
1774 .ok_or_else(|| {
1775 let suggestion = self.find_similar_column(table, col_name);
1776 match suggestion {
1777 Some(similar) => anyhow::anyhow!(
1778 "Column '{}' not found. Did you mean '{}'?",
1779 col_name,
1780 similar
1781 ),
1782 None => anyhow::anyhow!("Column '{}' not found", col_name),
1783 }
1784 })?;
1785 indices.push(index);
1786 }
1787
1788 Ok(indices)
1789 }
1790
1791 fn apply_select_items(
1793 &self,
1794 view: DataView,
1795 select_items: &[SelectItem],
1796 _statement: &SelectStatement,
1797 exec_context: Option<&ExecutionContext>,
1798 plan: &mut ExecutionPlanBuilder,
1799 ) -> Result<DataView> {
1800 debug!(
1801 "QueryEngine::apply_select_items - items: {:?}",
1802 select_items
1803 );
1804 debug!(
1805 "QueryEngine::apply_select_items - input view has {} rows",
1806 view.row_count()
1807 );
1808
1809 let has_window_functions = select_items.iter().any(|item| match item {
1811 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1812 _ => false,
1813 });
1814
1815 let window_func_count: usize = select_items
1817 .iter()
1818 .filter(|item| match item {
1819 SelectItem::Expression { expr, .. } => Self::contains_window_function(expr),
1820 _ => false,
1821 })
1822 .count();
1823
1824 let window_start = if has_window_functions {
1826 debug!(
1827 "QueryEngine::apply_select_items - detected {} window functions",
1828 window_func_count
1829 );
1830
1831 let window_specs = Self::extract_window_specs(select_items);
1833 debug!("Extracted {} window function specs", window_specs.len());
1834
1835 Some(Instant::now())
1836 } else {
1837 None
1838 };
1839
1840 let has_unnest = select_items.iter().any(|item| match item {
1842 SelectItem::Expression { expr, .. } => Self::contains_unnest(expr),
1843 _ => false,
1844 });
1845
1846 if has_unnest {
1847 debug!("QueryEngine::apply_select_items - UNNEST detected, using row expansion");
1848 return self.apply_select_with_row_expansion(view, select_items);
1849 }
1850
1851 let has_aggregates = select_items.iter().any(|item| match item {
1855 SelectItem::Expression { expr, .. } => contains_aggregate(expr),
1856 SelectItem::Column { .. } => false,
1857 SelectItem::Star { .. } => false,
1858 SelectItem::StarExclude { .. } => false,
1859 });
1860
1861 let all_aggregate_compatible = select_items.iter().all(|item| match item {
1862 SelectItem::Expression { expr, .. } => is_aggregate_compatible(expr),
1863 SelectItem::Column { .. } => false, SelectItem::Star { .. } => false, SelectItem::StarExclude { .. } => false, });
1867
1868 if has_aggregates && all_aggregate_compatible && view.row_count() > 0 {
1869 debug!("QueryEngine::apply_select_items - detected aggregate query with constants");
1872 return self.apply_aggregate_select(view, select_items);
1873 }
1874
1875 let has_computed_expressions = select_items
1877 .iter()
1878 .any(|item| matches!(item, SelectItem::Expression { .. }));
1879
1880 debug!(
1881 "QueryEngine::apply_select_items - has_computed_expressions: {}",
1882 has_computed_expressions
1883 );
1884
1885 if !has_computed_expressions {
1886 let column_indices = self.resolve_select_columns(view.source(), select_items)?;
1888 return Ok(view.with_columns(column_indices));
1889 }
1890
1891 let source_table = view.source();
1896 let visible_rows = view.visible_row_indices();
1897
1898 let mut computed_table = DataTable::new("query_result");
1901
1902 let mut expanded_items = Vec::new();
1904 for item in select_items {
1905 match item {
1906 SelectItem::Star { table_prefix, .. } => {
1907 if let Some(prefix) = table_prefix {
1908 debug!("QueryEngine::apply_select_items - expanding {}.*", prefix);
1910 for col in &source_table.columns {
1911 if Self::column_matches_table(col, prefix) {
1912 expanded_items.push(SelectItem::Column {
1913 column: ColumnRef::unquoted(col.name.clone()),
1914 leading_comments: vec![],
1915 trailing_comment: None,
1916 });
1917 }
1918 }
1919 } else {
1920 debug!("QueryEngine::apply_select_items - expanding *");
1922 for col_name in source_table.column_names() {
1923 expanded_items.push(SelectItem::Column {
1924 column: ColumnRef::unquoted(col_name.to_string()),
1925 leading_comments: vec![],
1926 trailing_comment: None,
1927 });
1928 }
1929 }
1930 }
1931 _ => expanded_items.push(item.clone()),
1932 }
1933 }
1934
1935 let mut column_name_counts: std::collections::HashMap<String, usize> =
1937 std::collections::HashMap::new();
1938
1939 for item in &expanded_items {
1940 let base_name = match item {
1941 SelectItem::Column {
1942 column: col_ref, ..
1943 } => col_ref.name.clone(),
1944 SelectItem::Expression { alias, .. } => alias.clone(),
1945 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
1946 SelectItem::StarExclude { .. } => {
1947 unreachable!("StarExclude should have been expanded")
1948 }
1949 };
1950
1951 let count = column_name_counts.entry(base_name.clone()).or_insert(0);
1953 let column_name = if *count == 0 {
1954 base_name.clone()
1956 } else {
1957 format!("{base_name}_{count}")
1959 };
1960 *count += 1;
1961
1962 computed_table.add_column(DataColumn::new(&column_name));
1963 }
1964
1965 let can_use_batch = expanded_items.iter().all(|item| {
1969 match item {
1970 SelectItem::Expression { expr, .. } => {
1971 matches!(expr, SqlExpression::WindowFunction { .. })
1974 || !Self::contains_window_function(expr)
1975 }
1976 _ => true, }
1978 });
1979
1980 let use_batch_evaluation = can_use_batch
1983 && std::env::var("SQL_CLI_BATCH_WINDOW")
1984 .map(|v| v != "0" && v.to_lowercase() != "false")
1985 .unwrap_or(true);
1986
1987 let batch_window_specs = if use_batch_evaluation && has_window_functions {
1989 debug!("BATCH window function evaluation flag is enabled");
1990 let specs = Self::extract_window_specs(&expanded_items);
1992 debug!(
1993 "Extracted {} window function specs for batch evaluation",
1994 specs.len()
1995 );
1996 Some(specs)
1997 } else {
1998 None
1999 };
2000
2001 let mut evaluator =
2003 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2004
2005 if let Some(exec_ctx) = exec_context {
2007 let aliases = exec_ctx.get_aliases();
2008 if !aliases.is_empty() {
2009 debug!(
2010 "Applying {} aliases to evaluator: {:?}",
2011 aliases.len(),
2012 aliases
2013 );
2014 evaluator = evaluator.with_table_aliases(aliases);
2015 }
2016 }
2017
2018 if has_window_functions {
2021 let preload_start = Instant::now();
2022
2023 let mut window_specs = Vec::new();
2025 for item in &expanded_items {
2026 if let SelectItem::Expression { expr, .. } = item {
2027 Self::collect_window_specs(expr, &mut window_specs);
2028 }
2029 }
2030
2031 for spec in &window_specs {
2033 let _ = evaluator.get_or_create_window_context(spec);
2034 }
2035
2036 debug!(
2037 "Pre-created {} WindowContext(s) in {:.2}ms",
2038 window_specs.len(),
2039 preload_start.elapsed().as_secs_f64() * 1000.0
2040 );
2041 }
2042
2043 if let Some(window_specs) = batch_window_specs {
2045 debug!("Starting batch window function evaluation");
2046 let batch_start = Instant::now();
2047
2048 let mut batch_results: Vec<Vec<DataValue>> =
2050 vec![vec![DataValue::Null; expanded_items.len()]; visible_rows.len()];
2051
2052 let detailed_window_specs = &window_specs;
2054
2055 let mut specs_by_window: HashMap<
2057 u64,
2058 Vec<&crate::data::batch_window_evaluator::WindowFunctionSpec>,
2059 > = HashMap::new();
2060 for spec in detailed_window_specs {
2061 let hash = spec.spec.compute_hash();
2062 specs_by_window
2063 .entry(hash)
2064 .or_insert_with(Vec::new)
2065 .push(spec);
2066 }
2067
2068 for (_window_hash, specs) in specs_by_window {
2070 let context = evaluator.get_or_create_window_context(&specs[0].spec)?;
2072
2073 for spec in specs {
2075 match spec.function_name.as_str() {
2076 "LAG" => {
2077 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2079 let column_name = col_ref.name.as_str();
2080 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2081 spec.args.get(1)
2082 {
2083 n.parse::<i64>().unwrap_or(1)
2084 } else {
2085 1 };
2087
2088 let values = context.evaluate_lag_batch(
2089 visible_rows,
2090 column_name,
2091 offset,
2092 )?;
2093
2094 for (row_idx, value) in values.into_iter().enumerate() {
2096 batch_results[row_idx][spec.output_column_index] = value;
2097 }
2098 }
2099 }
2100 "LEAD" => {
2101 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2103 let column_name = col_ref.name.as_str();
2104 let offset = if let Some(SqlExpression::NumberLiteral(n)) =
2105 spec.args.get(1)
2106 {
2107 n.parse::<i64>().unwrap_or(1)
2108 } else {
2109 1 };
2111
2112 let values = context.evaluate_lead_batch(
2113 visible_rows,
2114 column_name,
2115 offset,
2116 )?;
2117
2118 for (row_idx, value) in values.into_iter().enumerate() {
2120 batch_results[row_idx][spec.output_column_index] = value;
2121 }
2122 }
2123 }
2124 "ROW_NUMBER" => {
2125 let values = context.evaluate_row_number_batch(visible_rows)?;
2126
2127 for (row_idx, value) in values.into_iter().enumerate() {
2129 batch_results[row_idx][spec.output_column_index] = value;
2130 }
2131 }
2132 "RANK" => {
2133 let values = context.evaluate_rank_batch(visible_rows)?;
2134
2135 for (row_idx, value) in values.into_iter().enumerate() {
2137 batch_results[row_idx][spec.output_column_index] = value;
2138 }
2139 }
2140 "DENSE_RANK" => {
2141 let values = context.evaluate_dense_rank_batch(visible_rows)?;
2142
2143 for (row_idx, value) in values.into_iter().enumerate() {
2145 batch_results[row_idx][spec.output_column_index] = value;
2146 }
2147 }
2148 "SUM" => {
2149 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2150 let column_name = col_ref.name.as_str();
2151 let values =
2152 context.evaluate_sum_batch(visible_rows, column_name)?;
2153
2154 for (row_idx, value) in values.into_iter().enumerate() {
2155 batch_results[row_idx][spec.output_column_index] = value;
2156 }
2157 }
2158 }
2159 "AVG" => {
2160 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2161 let column_name = col_ref.name.as_str();
2162 let values =
2163 context.evaluate_avg_batch(visible_rows, column_name)?;
2164
2165 for (row_idx, value) in values.into_iter().enumerate() {
2166 batch_results[row_idx][spec.output_column_index] = value;
2167 }
2168 }
2169 }
2170 "MIN" => {
2171 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2172 let column_name = col_ref.name.as_str();
2173 let values =
2174 context.evaluate_min_batch(visible_rows, column_name)?;
2175
2176 for (row_idx, value) in values.into_iter().enumerate() {
2177 batch_results[row_idx][spec.output_column_index] = value;
2178 }
2179 }
2180 }
2181 "MAX" => {
2182 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2183 let column_name = col_ref.name.as_str();
2184 let values =
2185 context.evaluate_max_batch(visible_rows, column_name)?;
2186
2187 for (row_idx, value) in values.into_iter().enumerate() {
2188 batch_results[row_idx][spec.output_column_index] = value;
2189 }
2190 }
2191 }
2192 "COUNT" => {
2193 let column_name = match spec.args.get(0) {
2195 Some(SqlExpression::Column(col_ref)) => Some(col_ref.name.as_str()),
2196 Some(SqlExpression::StringLiteral(s)) if s == "*" => None,
2197 _ => None,
2198 };
2199
2200 let values = context.evaluate_count_batch(visible_rows, column_name)?;
2201
2202 for (row_idx, value) in values.into_iter().enumerate() {
2203 batch_results[row_idx][spec.output_column_index] = value;
2204 }
2205 }
2206 "FIRST_VALUE" => {
2207 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2208 let column_name = col_ref.name.as_str();
2209 let values = context
2210 .evaluate_first_value_batch(visible_rows, column_name)?;
2211
2212 for (row_idx, value) in values.into_iter().enumerate() {
2213 batch_results[row_idx][spec.output_column_index] = value;
2214 }
2215 }
2216 }
2217 "LAST_VALUE" => {
2218 if let Some(SqlExpression::Column(col_ref)) = spec.args.get(0) {
2219 let column_name = col_ref.name.as_str();
2220 let values =
2221 context.evaluate_last_value_batch(visible_rows, column_name)?;
2222
2223 for (row_idx, value) in values.into_iter().enumerate() {
2224 batch_results[row_idx][spec.output_column_index] = value;
2225 }
2226 }
2227 }
2228 _ => {
2229 debug!(
2231 "Window function {} not supported in batch mode, using per-row",
2232 spec.function_name
2233 );
2234 }
2235 }
2236 }
2237 }
2238
2239 for (result_row_idx, &source_row_idx) in visible_rows.iter().enumerate() {
2241 for (col_idx, item) in expanded_items.iter().enumerate() {
2242 if !matches!(batch_results[result_row_idx][col_idx], DataValue::Null) {
2244 continue;
2245 }
2246
2247 let value = match item {
2248 SelectItem::Column {
2249 column: col_ref, ..
2250 } => {
2251 match evaluator
2252 .evaluate(&SqlExpression::Column(col_ref.clone()), source_row_idx)
2253 {
2254 Ok(val) => val,
2255 Err(e) => {
2256 return Err(anyhow!(
2257 "Failed to evaluate column {}: {}",
2258 col_ref.to_sql(),
2259 e
2260 ));
2261 }
2262 }
2263 }
2264 SelectItem::Expression { expr, .. } => {
2265 if matches!(expr, SqlExpression::WindowFunction { .. }) {
2268 continue;
2270 }
2271 evaluator.evaluate(&expr, source_row_idx)?
2274 }
2275 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2276 SelectItem::StarExclude { .. } => {
2277 unreachable!("StarExclude should have been expanded")
2278 }
2279 };
2280 batch_results[result_row_idx][col_idx] = value;
2281 }
2282 }
2283
2284 for row_values in batch_results {
2286 computed_table
2287 .add_row(DataRow::new(row_values))
2288 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2289 }
2290
2291 debug!(
2292 "Batch window evaluation completed in {:.3}ms",
2293 batch_start.elapsed().as_secs_f64() * 1000.0
2294 );
2295 } else {
2296 for &row_idx in visible_rows {
2298 let mut row_values = Vec::new();
2299
2300 for item in &expanded_items {
2301 let value = match item {
2302 SelectItem::Column {
2303 column: col_ref, ..
2304 } => {
2305 match evaluator
2307 .evaluate(&SqlExpression::Column(col_ref.clone()), row_idx)
2308 {
2309 Ok(val) => val,
2310 Err(e) => {
2311 return Err(anyhow!(
2312 "Failed to evaluate column {}: {}",
2313 col_ref.to_sql(),
2314 e
2315 ));
2316 }
2317 }
2318 }
2319 SelectItem::Expression { expr, .. } => {
2320 evaluator.evaluate(&expr, row_idx)?
2322 }
2323 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2324 SelectItem::StarExclude { .. } => {
2325 unreachable!("StarExclude should have been expanded")
2326 }
2327 };
2328 row_values.push(value);
2329 }
2330
2331 computed_table
2332 .add_row(DataRow::new(row_values))
2333 .map_err(|e| anyhow::anyhow!("Failed to add row: {}", e))?;
2334 }
2335 }
2336
2337 if let Some(start) = window_start {
2339 let window_duration = start.elapsed();
2340 info!(
2341 "Window function evaluation took {:.2}ms for {} rows ({} window functions)",
2342 window_duration.as_secs_f64() * 1000.0,
2343 visible_rows.len(),
2344 window_func_count
2345 );
2346
2347 plan.begin_step(
2349 StepType::WindowFunction,
2350 format!("Evaluate {} window function(s)", window_func_count),
2351 );
2352 plan.set_rows_in(visible_rows.len());
2353 plan.set_rows_out(visible_rows.len());
2354 plan.add_detail(format!("Input: {} rows", visible_rows.len()));
2355 plan.add_detail(format!("{} window functions evaluated", window_func_count));
2356 plan.add_detail(format!(
2357 "Evaluation time: {:.3}ms",
2358 window_duration.as_secs_f64() * 1000.0
2359 ));
2360 plan.end_step();
2361 }
2362
2363 Ok(DataView::new(Arc::new(computed_table)))
2366 }
2367
2368 fn apply_select_with_row_expansion(
2370 &self,
2371 view: DataView,
2372 select_items: &[SelectItem],
2373 ) -> Result<DataView> {
2374 debug!("QueryEngine::apply_select_with_row_expansion - expanding rows");
2375
2376 let source_table = view.source();
2377 let visible_rows = view.visible_row_indices();
2378 let expander_registry = RowExpanderRegistry::new();
2379
2380 let mut result_table = DataTable::new("unnest_result");
2382
2383 let mut expanded_items = Vec::new();
2385 for item in select_items {
2386 match item {
2387 SelectItem::Star { table_prefix, .. } => {
2388 if let Some(prefix) = table_prefix {
2389 debug!(
2391 "QueryEngine::apply_select_with_row_expansion - expanding {}.*",
2392 prefix
2393 );
2394 for col in &source_table.columns {
2395 if Self::column_matches_table(col, prefix) {
2396 expanded_items.push(SelectItem::Column {
2397 column: ColumnRef::unquoted(col.name.clone()),
2398 leading_comments: vec![],
2399 trailing_comment: None,
2400 });
2401 }
2402 }
2403 } else {
2404 debug!("QueryEngine::apply_select_with_row_expansion - expanding *");
2406 for col_name in source_table.column_names() {
2407 expanded_items.push(SelectItem::Column {
2408 column: ColumnRef::unquoted(col_name.to_string()),
2409 leading_comments: vec![],
2410 trailing_comment: None,
2411 });
2412 }
2413 }
2414 }
2415 _ => expanded_items.push(item.clone()),
2416 }
2417 }
2418
2419 for item in &expanded_items {
2421 let column_name = match item {
2422 SelectItem::Column {
2423 column: col_ref, ..
2424 } => col_ref.name.clone(),
2425 SelectItem::Expression { alias, .. } => alias.clone(),
2426 SelectItem::Star { .. } => unreachable!("Star should have been expanded"),
2427 SelectItem::StarExclude { .. } => {
2428 unreachable!("StarExclude should have been expanded")
2429 }
2430 };
2431 result_table.add_column(DataColumn::new(&column_name));
2432 }
2433
2434 let mut evaluator =
2436 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone());
2437
2438 for &row_idx in visible_rows {
2439 let mut unnest_expansions = Vec::new();
2441 let mut unnest_indices = Vec::new();
2442
2443 for (col_idx, item) in expanded_items.iter().enumerate() {
2444 if let SelectItem::Expression { expr, .. } = item {
2445 if let Some(expansion_result) = self.try_expand_unnest(
2446 &expr,
2447 source_table,
2448 row_idx,
2449 &mut evaluator,
2450 &expander_registry,
2451 )? {
2452 unnest_expansions.push(expansion_result);
2453 unnest_indices.push(col_idx);
2454 }
2455 }
2456 }
2457
2458 let expansion_count = if unnest_expansions.is_empty() {
2460 1 } else {
2462 unnest_expansions
2463 .iter()
2464 .map(|exp| exp.row_count())
2465 .max()
2466 .unwrap_or(1)
2467 };
2468
2469 for output_idx in 0..expansion_count {
2471 let mut row_values = Vec::new();
2472
2473 for (col_idx, item) in expanded_items.iter().enumerate() {
2474 let unnest_position = unnest_indices.iter().position(|&idx| idx == col_idx);
2476
2477 let value = if let Some(unnest_idx) = unnest_position {
2478 let expansion = &unnest_expansions[unnest_idx];
2480 expansion
2481 .values
2482 .get(output_idx)
2483 .cloned()
2484 .unwrap_or(DataValue::Null)
2485 } else {
2486 match item {
2488 SelectItem::Column {
2489 column: col_ref, ..
2490 } => {
2491 let col_idx =
2492 source_table.get_column_index(&col_ref.name).ok_or_else(
2493 || anyhow::anyhow!("Column '{}' not found", col_ref.name),
2494 )?;
2495 let row = source_table
2496 .get_row(row_idx)
2497 .ok_or_else(|| anyhow::anyhow!("Row {} not found", row_idx))?;
2498 row.get(col_idx)
2499 .ok_or_else(|| {
2500 anyhow::anyhow!("Column {} not found in row", col_idx)
2501 })?
2502 .clone()
2503 }
2504 SelectItem::Expression { expr, .. } => {
2505 evaluator.evaluate(&expr, row_idx)?
2507 }
2508 SelectItem::Star { .. } => unreachable!(),
2509 SelectItem::StarExclude { .. } => {
2510 unreachable!("StarExclude should have been expanded")
2511 }
2512 }
2513 };
2514
2515 row_values.push(value);
2516 }
2517
2518 result_table
2519 .add_row(DataRow::new(row_values))
2520 .map_err(|e| anyhow::anyhow!("Failed to add expanded row: {}", e))?;
2521 }
2522 }
2523
2524 debug!(
2525 "QueryEngine::apply_select_with_row_expansion - input rows: {}, output rows: {}",
2526 visible_rows.len(),
2527 result_table.row_count()
2528 );
2529
2530 Ok(DataView::new(Arc::new(result_table)))
2531 }
2532
2533 fn try_expand_unnest(
2536 &self,
2537 expr: &SqlExpression,
2538 _source_table: &DataTable,
2539 row_idx: usize,
2540 evaluator: &mut ArithmeticEvaluator,
2541 expander_registry: &RowExpanderRegistry,
2542 ) -> Result<Option<crate::data::row_expanders::ExpansionResult>> {
2543 if let SqlExpression::Unnest { column, delimiter } = expr {
2545 let column_value = evaluator.evaluate(column, row_idx)?;
2547
2548 let delimiter_value = DataValue::String(delimiter.clone());
2550
2551 let expander = expander_registry
2553 .get("UNNEST")
2554 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2555
2556 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2558 return Ok(Some(expansion));
2559 }
2560
2561 if let SqlExpression::FunctionCall { name, args, .. } = expr {
2563 if name.to_uppercase() == "UNNEST" {
2564 if args.len() != 2 {
2566 return Err(anyhow::anyhow!(
2567 "UNNEST requires exactly 2 arguments: UNNEST(column, delimiter)"
2568 ));
2569 }
2570
2571 let column_value = evaluator.evaluate(&args[0], row_idx)?;
2573
2574 let delimiter_value = evaluator.evaluate(&args[1], row_idx)?;
2576
2577 let expander = expander_registry
2579 .get("UNNEST")
2580 .ok_or_else(|| anyhow::anyhow!("UNNEST expander not found"))?;
2581
2582 let expansion = expander.expand(&column_value, &[delimiter_value])?;
2584 return Ok(Some(expansion));
2585 }
2586 }
2587
2588 Ok(None)
2589 }
2590
2591 fn apply_aggregate_select(
2593 &self,
2594 view: DataView,
2595 select_items: &[SelectItem],
2596 ) -> Result<DataView> {
2597 debug!("QueryEngine::apply_aggregate_select - creating single row aggregate result");
2598
2599 let source_table = view.source();
2600 let mut result_table = DataTable::new("aggregate_result");
2601
2602 for item in select_items {
2604 let column_name = match item {
2605 SelectItem::Expression { alias, .. } => alias.clone(),
2606 _ => unreachable!("Should only have expressions in aggregate-only query"),
2607 };
2608 result_table.add_column(DataColumn::new(&column_name));
2609 }
2610
2611 let visible_rows = view.visible_row_indices().to_vec();
2613 let mut evaluator =
2614 ArithmeticEvaluator::with_date_notation(source_table, self.date_notation.clone())
2615 .with_visible_rows(visible_rows);
2616
2617 let mut row_values = Vec::new();
2619 for item in select_items {
2620 match item {
2621 SelectItem::Expression { expr, .. } => {
2622 let value = evaluator.evaluate(expr, 0)?;
2625 row_values.push(value);
2626 }
2627 _ => unreachable!("Should only have expressions in aggregate-only query"),
2628 }
2629 }
2630
2631 result_table
2633 .add_row(DataRow::new(row_values))
2634 .map_err(|e| anyhow::anyhow!("Failed to add aggregate result row: {}", e))?;
2635
2636 Ok(DataView::new(Arc::new(result_table)))
2637 }
2638
2639 fn column_matches_table(col: &DataColumn, table_name: &str) -> bool {
2651 if let Some(ref source) = col.source_table {
2653 if source == table_name || source.ends_with(&format!(".{}", table_name)) {
2655 return true;
2656 }
2657 }
2658
2659 if let Some(ref qualified) = col.qualified_name {
2661 if qualified.starts_with(&format!("{}.", table_name)) {
2663 return true;
2664 }
2665 }
2666
2667 false
2668 }
2669
2670 fn resolve_select_columns(
2672 &self,
2673 table: &DataTable,
2674 select_items: &[SelectItem],
2675 ) -> Result<Vec<usize>> {
2676 let mut indices = Vec::new();
2677 let table_columns = table.column_names();
2678
2679 for item in select_items {
2680 match item {
2681 SelectItem::Column {
2682 column: col_ref, ..
2683 } => {
2684 let index = if let Some(table_prefix) = &col_ref.table_prefix {
2686 let qualified_name = format!("{}.{}", table_prefix, col_ref.name);
2688 table.find_column_by_qualified_name(&qualified_name)
2689 .ok_or_else(|| {
2690 let has_qualified = table.columns.iter()
2692 .any(|c| c.qualified_name.is_some());
2693 if !has_qualified {
2694 anyhow::anyhow!(
2695 "Column '{}' not found. Note: Table '{}' may not support qualified column names",
2696 qualified_name, table_prefix
2697 )
2698 } else {
2699 anyhow::anyhow!("Column '{}' not found", qualified_name)
2700 }
2701 })?
2702 } else {
2703 table_columns
2705 .iter()
2706 .position(|c| c.eq_ignore_ascii_case(&col_ref.name))
2707 .ok_or_else(|| {
2708 let suggestion = self.find_similar_column(table, &col_ref.name);
2709 match suggestion {
2710 Some(similar) => anyhow::anyhow!(
2711 "Column '{}' not found. Did you mean '{}'?",
2712 col_ref.name,
2713 similar
2714 ),
2715 None => anyhow::anyhow!("Column '{}' not found", col_ref.name),
2716 }
2717 })?
2718 };
2719 indices.push(index);
2720 }
2721 SelectItem::Star { table_prefix, .. } => {
2722 if let Some(prefix) = table_prefix {
2723 for (i, col) in table.columns.iter().enumerate() {
2725 if Self::column_matches_table(col, prefix) {
2726 indices.push(i);
2727 }
2728 }
2729 } else {
2730 for i in 0..table_columns.len() {
2732 indices.push(i);
2733 }
2734 }
2735 }
2736 SelectItem::StarExclude {
2737 table_prefix,
2738 excluded_columns,
2739 ..
2740 } => {
2741 if let Some(prefix) = table_prefix {
2743 for (i, col) in table.columns.iter().enumerate() {
2745 if Self::column_matches_table(col, prefix)
2746 && !excluded_columns.contains(&col.name)
2747 {
2748 indices.push(i);
2749 }
2750 }
2751 } else {
2752 for (i, col_name) in table_columns.iter().enumerate() {
2754 if !excluded_columns
2755 .iter()
2756 .any(|exc| exc.eq_ignore_ascii_case(col_name))
2757 {
2758 indices.push(i);
2759 }
2760 }
2761 }
2762 }
2763 SelectItem::Expression { .. } => {
2764 return Err(anyhow::anyhow!(
2765 "Computed expressions require new table creation"
2766 ));
2767 }
2768 }
2769 }
2770
2771 Ok(indices)
2772 }
2773
2774 fn apply_distinct(&self, view: DataView) -> Result<DataView> {
2776 use std::collections::HashSet;
2777
2778 let source = view.source();
2779 let visible_cols = view.visible_column_indices();
2780 let visible_rows = view.visible_row_indices();
2781
2782 let mut seen_rows = HashSet::new();
2784 let mut unique_row_indices = Vec::new();
2785
2786 for &row_idx in visible_rows {
2787 let mut row_key = Vec::new();
2789 for &col_idx in visible_cols {
2790 let value = source
2791 .get_value(row_idx, col_idx)
2792 .ok_or_else(|| anyhow!("Invalid cell reference"))?;
2793 row_key.push(format!("{:?}", value));
2795 }
2796
2797 if seen_rows.insert(row_key) {
2799 unique_row_indices.push(row_idx);
2801 }
2802 }
2803
2804 Ok(view.with_rows(unique_row_indices))
2806 }
2807
2808 fn apply_multi_order_by(
2810 &self,
2811 view: DataView,
2812 order_by_columns: &[OrderByItem],
2813 ) -> Result<DataView> {
2814 self.apply_multi_order_by_with_context(view, order_by_columns, None)
2815 }
2816
2817 fn apply_multi_order_by_with_context(
2819 &self,
2820 mut view: DataView,
2821 order_by_columns: &[OrderByItem],
2822 _exec_context: Option<&ExecutionContext>,
2823 ) -> Result<DataView> {
2824 let mut sort_columns = Vec::new();
2826
2827 for order_col in order_by_columns {
2828 let column_name = match &order_col.expr {
2830 SqlExpression::Column(col_ref) => col_ref.name.clone(),
2831 _ => {
2832 return Err(anyhow!(
2834 "ORDER BY expressions not yet supported - only simple columns allowed"
2835 ));
2836 }
2837 };
2838
2839 let col_index = if column_name.contains('.') {
2841 if let Some(dot_pos) = column_name.rfind('.') {
2843 let col_name = &column_name[dot_pos + 1..];
2844
2845 debug!(
2848 "ORDER BY: Extracting unqualified column '{}' from '{}'",
2849 col_name, column_name
2850 );
2851 view.source().get_column_index(col_name)
2852 } else {
2853 view.source().get_column_index(&column_name)
2854 }
2855 } else {
2856 view.source().get_column_index(&column_name)
2858 }
2859 .ok_or_else(|| {
2860 let suggestion = self.find_similar_column(view.source(), &column_name);
2862 match suggestion {
2863 Some(similar) => anyhow::anyhow!(
2864 "Column '{}' not found. Did you mean '{}'?",
2865 column_name,
2866 similar
2867 ),
2868 None => {
2869 let available_cols = view.source().column_names().join(", ");
2871 anyhow::anyhow!(
2872 "Column '{}' not found. Available columns: {}",
2873 column_name,
2874 available_cols
2875 )
2876 }
2877 }
2878 })?;
2879
2880 let ascending = matches!(order_col.direction, SortDirection::Asc);
2881 sort_columns.push((col_index, ascending));
2882 }
2883
2884 view.apply_multi_sort(&sort_columns)?;
2886 Ok(view)
2887 }
2888
2889 fn apply_group_by(
2891 &self,
2892 view: DataView,
2893 group_by_exprs: &[SqlExpression],
2894 select_items: &[SelectItem],
2895 having: Option<&SqlExpression>,
2896 plan: &mut ExecutionPlanBuilder,
2897 ) -> Result<DataView> {
2898 let (result_view, phase_info) = self.apply_group_by_expressions(
2900 view,
2901 group_by_exprs,
2902 select_items,
2903 having,
2904 self.case_insensitive,
2905 self.date_notation.clone(),
2906 )?;
2907
2908 plan.add_detail(format!("=== GROUP BY Phase Breakdown ==="));
2910 plan.add_detail(format!(
2911 "Phase 1 - Group Building: {:.3}ms",
2912 phase_info.phase2_key_building.as_secs_f64() * 1000.0
2913 ));
2914 plan.add_detail(format!(
2915 " • Processing {} rows into {} groups",
2916 phase_info.total_rows, phase_info.num_groups
2917 ));
2918 plan.add_detail(format!(
2919 "Phase 2 - Aggregation: {:.3}ms",
2920 phase_info.phase4_aggregation.as_secs_f64() * 1000.0
2921 ));
2922 if phase_info.phase4_having_evaluation > Duration::ZERO {
2923 plan.add_detail(format!(
2924 "Phase 3 - HAVING Filter: {:.3}ms",
2925 phase_info.phase4_having_evaluation.as_secs_f64() * 1000.0
2926 ));
2927 plan.add_detail(format!(
2928 " • Filtered {} groups",
2929 phase_info.groups_filtered_by_having
2930 ));
2931 }
2932 plan.add_detail(format!(
2933 "Total GROUP BY time: {:.3}ms",
2934 phase_info.total_time.as_secs_f64() * 1000.0
2935 ));
2936
2937 Ok(result_view)
2938 }
2939
2940 pub fn estimate_group_cardinality(
2943 &self,
2944 view: &DataView,
2945 group_by_exprs: &[SqlExpression],
2946 ) -> usize {
2947 let row_count = view.get_visible_rows().len();
2949 if row_count <= 100 {
2950 return row_count;
2951 }
2952
2953 let sample_size = min(1000, row_count / 10).max(100);
2955 let mut seen = FxHashSet::default();
2956
2957 let visible_rows = view.get_visible_rows();
2958 for (i, &row_idx) in visible_rows.iter().enumerate() {
2959 if i >= sample_size {
2960 break;
2961 }
2962
2963 let mut key_values = Vec::new();
2965 for expr in group_by_exprs {
2966 let mut evaluator = ArithmeticEvaluator::new(view.source());
2967 let value = evaluator.evaluate(expr, row_idx).unwrap_or(DataValue::Null);
2968 key_values.push(value);
2969 }
2970
2971 seen.insert(key_values);
2972 }
2973
2974 let sample_cardinality = seen.len();
2976 let estimated = (sample_cardinality * row_count) / sample_size;
2977
2978 estimated.min(row_count).max(sample_cardinality)
2980 }
2981}
2982
2983#[cfg(test)]
2984mod tests {
2985 use super::*;
2986 use crate::data::datatable::{DataColumn, DataRow, DataValue};
2987
2988 fn create_test_table() -> Arc<DataTable> {
2989 let mut table = DataTable::new("test");
2990
2991 table.add_column(DataColumn::new("id"));
2993 table.add_column(DataColumn::new("name"));
2994 table.add_column(DataColumn::new("age"));
2995
2996 table
2998 .add_row(DataRow::new(vec![
2999 DataValue::Integer(1),
3000 DataValue::String("Alice".to_string()),
3001 DataValue::Integer(30),
3002 ]))
3003 .unwrap();
3004
3005 table
3006 .add_row(DataRow::new(vec![
3007 DataValue::Integer(2),
3008 DataValue::String("Bob".to_string()),
3009 DataValue::Integer(25),
3010 ]))
3011 .unwrap();
3012
3013 table
3014 .add_row(DataRow::new(vec![
3015 DataValue::Integer(3),
3016 DataValue::String("Charlie".to_string()),
3017 DataValue::Integer(35),
3018 ]))
3019 .unwrap();
3020
3021 Arc::new(table)
3022 }
3023
3024 #[test]
3025 fn test_select_all() {
3026 let table = create_test_table();
3027 let engine = QueryEngine::new();
3028
3029 let view = engine
3030 .execute(table.clone(), "SELECT * FROM users")
3031 .unwrap();
3032 assert_eq!(view.row_count(), 3);
3033 assert_eq!(view.column_count(), 3);
3034 }
3035
3036 #[test]
3037 fn test_select_columns() {
3038 let table = create_test_table();
3039 let engine = QueryEngine::new();
3040
3041 let view = engine
3042 .execute(table.clone(), "SELECT name, age FROM users")
3043 .unwrap();
3044 assert_eq!(view.row_count(), 3);
3045 assert_eq!(view.column_count(), 2);
3046 }
3047
3048 #[test]
3049 fn test_select_with_limit() {
3050 let table = create_test_table();
3051 let engine = QueryEngine::new();
3052
3053 let view = engine
3054 .execute(table.clone(), "SELECT * FROM users LIMIT 2")
3055 .unwrap();
3056 assert_eq!(view.row_count(), 2);
3057 }
3058
3059 #[test]
3060 fn test_type_coercion_contains() {
3061 let _ = tracing_subscriber::fmt()
3063 .with_max_level(tracing::Level::DEBUG)
3064 .try_init();
3065
3066 let mut table = DataTable::new("test");
3067 table.add_column(DataColumn::new("id"));
3068 table.add_column(DataColumn::new("status"));
3069 table.add_column(DataColumn::new("price"));
3070
3071 table
3073 .add_row(DataRow::new(vec![
3074 DataValue::Integer(1),
3075 DataValue::String("Pending".to_string()),
3076 DataValue::Float(99.99),
3077 ]))
3078 .unwrap();
3079
3080 table
3081 .add_row(DataRow::new(vec![
3082 DataValue::Integer(2),
3083 DataValue::String("Confirmed".to_string()),
3084 DataValue::Float(150.50),
3085 ]))
3086 .unwrap();
3087
3088 table
3089 .add_row(DataRow::new(vec![
3090 DataValue::Integer(3),
3091 DataValue::String("Pending".to_string()),
3092 DataValue::Float(75.00),
3093 ]))
3094 .unwrap();
3095
3096 let table = Arc::new(table);
3097 let engine = QueryEngine::new();
3098
3099 println!("\n=== Testing WHERE clause with Contains ===");
3100 println!("Table has {} rows", table.row_count());
3101 for i in 0..table.row_count() {
3102 let status = table.get_value(i, 1);
3103 println!("Row {i}: status = {status:?}");
3104 }
3105
3106 println!("\n--- Test 1: status.Contains('pend') ---");
3108 let result = engine.execute(
3109 table.clone(),
3110 "SELECT * FROM test WHERE status.Contains('pend')",
3111 );
3112 match result {
3113 Ok(view) => {
3114 println!("SUCCESS: Found {} matching rows", view.row_count());
3115 assert_eq!(view.row_count(), 2); }
3117 Err(e) => {
3118 panic!("Query failed: {e}");
3119 }
3120 }
3121
3122 println!("\n--- Test 2: price.Contains('9') ---");
3124 let result = engine.execute(
3125 table.clone(),
3126 "SELECT * FROM test WHERE price.Contains('9')",
3127 );
3128 match result {
3129 Ok(view) => {
3130 println!(
3131 "SUCCESS: Found {} matching rows with price containing '9'",
3132 view.row_count()
3133 );
3134 assert!(view.row_count() >= 1);
3136 }
3137 Err(e) => {
3138 panic!("Numeric coercion query failed: {e}");
3139 }
3140 }
3141
3142 println!("\n=== All tests passed! ===");
3143 }
3144
3145 #[test]
3146 fn test_not_in_clause() {
3147 let _ = tracing_subscriber::fmt()
3149 .with_max_level(tracing::Level::DEBUG)
3150 .try_init();
3151
3152 let mut table = DataTable::new("test");
3153 table.add_column(DataColumn::new("id"));
3154 table.add_column(DataColumn::new("country"));
3155
3156 table
3158 .add_row(DataRow::new(vec![
3159 DataValue::Integer(1),
3160 DataValue::String("CA".to_string()),
3161 ]))
3162 .unwrap();
3163
3164 table
3165 .add_row(DataRow::new(vec![
3166 DataValue::Integer(2),
3167 DataValue::String("US".to_string()),
3168 ]))
3169 .unwrap();
3170
3171 table
3172 .add_row(DataRow::new(vec![
3173 DataValue::Integer(3),
3174 DataValue::String("UK".to_string()),
3175 ]))
3176 .unwrap();
3177
3178 let table = Arc::new(table);
3179 let engine = QueryEngine::new();
3180
3181 println!("\n=== Testing NOT IN clause ===");
3182 println!("Table has {} rows", table.row_count());
3183 for i in 0..table.row_count() {
3184 let country = table.get_value(i, 1);
3185 println!("Row {i}: country = {country:?}");
3186 }
3187
3188 println!("\n--- Test: country NOT IN ('CA') ---");
3190 let result = engine.execute(
3191 table.clone(),
3192 "SELECT * FROM test WHERE country NOT IN ('CA')",
3193 );
3194 match result {
3195 Ok(view) => {
3196 println!("SUCCESS: Found {} rows not in ('CA')", view.row_count());
3197 assert_eq!(view.row_count(), 2); }
3199 Err(e) => {
3200 panic!("NOT IN query failed: {e}");
3201 }
3202 }
3203
3204 println!("\n=== NOT IN test complete! ===");
3205 }
3206
3207 #[test]
3208 fn test_case_insensitive_in_and_not_in() {
3209 let _ = tracing_subscriber::fmt()
3211 .with_max_level(tracing::Level::DEBUG)
3212 .try_init();
3213
3214 let mut table = DataTable::new("test");
3215 table.add_column(DataColumn::new("id"));
3216 table.add_column(DataColumn::new("country"));
3217
3218 table
3220 .add_row(DataRow::new(vec![
3221 DataValue::Integer(1),
3222 DataValue::String("CA".to_string()), ]))
3224 .unwrap();
3225
3226 table
3227 .add_row(DataRow::new(vec![
3228 DataValue::Integer(2),
3229 DataValue::String("us".to_string()), ]))
3231 .unwrap();
3232
3233 table
3234 .add_row(DataRow::new(vec![
3235 DataValue::Integer(3),
3236 DataValue::String("UK".to_string()), ]))
3238 .unwrap();
3239
3240 let table = Arc::new(table);
3241
3242 println!("\n=== Testing Case-Insensitive IN clause ===");
3243 println!("Table has {} rows", table.row_count());
3244 for i in 0..table.row_count() {
3245 let country = table.get_value(i, 1);
3246 println!("Row {i}: country = {country:?}");
3247 }
3248
3249 println!("\n--- Test: country IN ('ca') with case_insensitive=true ---");
3251 let engine = QueryEngine::with_case_insensitive(true);
3252 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3253 match result {
3254 Ok(view) => {
3255 println!(
3256 "SUCCESS: Found {} rows matching 'ca' (case-insensitive)",
3257 view.row_count()
3258 );
3259 assert_eq!(view.row_count(), 1); }
3261 Err(e) => {
3262 panic!("Case-insensitive IN query failed: {e}");
3263 }
3264 }
3265
3266 println!("\n--- Test: country NOT IN ('ca') with case_insensitive=true ---");
3268 let result = engine.execute(
3269 table.clone(),
3270 "SELECT * FROM test WHERE country NOT IN ('ca')",
3271 );
3272 match result {
3273 Ok(view) => {
3274 println!(
3275 "SUCCESS: Found {} rows not matching 'ca' (case-insensitive)",
3276 view.row_count()
3277 );
3278 assert_eq!(view.row_count(), 2); }
3280 Err(e) => {
3281 panic!("Case-insensitive NOT IN query failed: {e}");
3282 }
3283 }
3284
3285 println!("\n--- Test: country IN ('ca') with case_insensitive=false ---");
3287 let engine_case_sensitive = QueryEngine::new(); let result = engine_case_sensitive
3289 .execute(table.clone(), "SELECT * FROM test WHERE country IN ('ca')");
3290 match result {
3291 Ok(view) => {
3292 println!(
3293 "SUCCESS: Found {} rows matching 'ca' (case-sensitive)",
3294 view.row_count()
3295 );
3296 assert_eq!(view.row_count(), 0); }
3298 Err(e) => {
3299 panic!("Case-sensitive IN query failed: {e}");
3300 }
3301 }
3302
3303 println!("\n=== Case-insensitive IN/NOT IN test complete! ===");
3304 }
3305
3306 #[test]
3307 #[ignore = "Parentheses in WHERE clause not yet implemented"]
3308 fn test_parentheses_in_where_clause() {
3309 let _ = tracing_subscriber::fmt()
3311 .with_max_level(tracing::Level::DEBUG)
3312 .try_init();
3313
3314 let mut table = DataTable::new("test");
3315 table.add_column(DataColumn::new("id"));
3316 table.add_column(DataColumn::new("status"));
3317 table.add_column(DataColumn::new("priority"));
3318
3319 table
3321 .add_row(DataRow::new(vec![
3322 DataValue::Integer(1),
3323 DataValue::String("Pending".to_string()),
3324 DataValue::String("High".to_string()),
3325 ]))
3326 .unwrap();
3327
3328 table
3329 .add_row(DataRow::new(vec![
3330 DataValue::Integer(2),
3331 DataValue::String("Complete".to_string()),
3332 DataValue::String("High".to_string()),
3333 ]))
3334 .unwrap();
3335
3336 table
3337 .add_row(DataRow::new(vec![
3338 DataValue::Integer(3),
3339 DataValue::String("Pending".to_string()),
3340 DataValue::String("Low".to_string()),
3341 ]))
3342 .unwrap();
3343
3344 table
3345 .add_row(DataRow::new(vec![
3346 DataValue::Integer(4),
3347 DataValue::String("Complete".to_string()),
3348 DataValue::String("Low".to_string()),
3349 ]))
3350 .unwrap();
3351
3352 let table = Arc::new(table);
3353 let engine = QueryEngine::new();
3354
3355 println!("\n=== Testing Parentheses in WHERE clause ===");
3356 println!("Table has {} rows", table.row_count());
3357 for i in 0..table.row_count() {
3358 let status = table.get_value(i, 1);
3359 let priority = table.get_value(i, 2);
3360 println!("Row {i}: status = {status:?}, priority = {priority:?}");
3361 }
3362
3363 println!("\n--- Test: (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low') ---");
3365 let result = engine.execute(
3366 table.clone(),
3367 "SELECT * FROM test WHERE (status = 'Pending' AND priority = 'High') OR (status = 'Complete' AND priority = 'Low')",
3368 );
3369 match result {
3370 Ok(view) => {
3371 println!(
3372 "SUCCESS: Found {} rows with parenthetical logic",
3373 view.row_count()
3374 );
3375 assert_eq!(view.row_count(), 2); }
3377 Err(e) => {
3378 panic!("Parentheses query failed: {e}");
3379 }
3380 }
3381
3382 println!("\n=== Parentheses test complete! ===");
3383 }
3384
3385 #[test]
3386 #[ignore = "Numeric type coercion needs fixing"]
3387 fn test_numeric_type_coercion() {
3388 let _ = tracing_subscriber::fmt()
3390 .with_max_level(tracing::Level::DEBUG)
3391 .try_init();
3392
3393 let mut table = DataTable::new("test");
3394 table.add_column(DataColumn::new("id"));
3395 table.add_column(DataColumn::new("price"));
3396 table.add_column(DataColumn::new("quantity"));
3397
3398 table
3400 .add_row(DataRow::new(vec![
3401 DataValue::Integer(1),
3402 DataValue::Float(99.50), DataValue::Integer(100),
3404 ]))
3405 .unwrap();
3406
3407 table
3408 .add_row(DataRow::new(vec![
3409 DataValue::Integer(2),
3410 DataValue::Float(150.0), DataValue::Integer(200),
3412 ]))
3413 .unwrap();
3414
3415 table
3416 .add_row(DataRow::new(vec![
3417 DataValue::Integer(3),
3418 DataValue::Integer(75), DataValue::Integer(50),
3420 ]))
3421 .unwrap();
3422
3423 let table = Arc::new(table);
3424 let engine = QueryEngine::new();
3425
3426 println!("\n=== Testing Numeric Type Coercion ===");
3427 println!("Table has {} rows", table.row_count());
3428 for i in 0..table.row_count() {
3429 let price = table.get_value(i, 1);
3430 let quantity = table.get_value(i, 2);
3431 println!("Row {i}: price = {price:?}, quantity = {quantity:?}");
3432 }
3433
3434 println!("\n--- Test: price.Contains('.') ---");
3436 let result = engine.execute(
3437 table.clone(),
3438 "SELECT * FROM test WHERE price.Contains('.')",
3439 );
3440 match result {
3441 Ok(view) => {
3442 println!(
3443 "SUCCESS: Found {} rows with decimal points in price",
3444 view.row_count()
3445 );
3446 assert_eq!(view.row_count(), 2); }
3448 Err(e) => {
3449 panic!("Numeric Contains query failed: {e}");
3450 }
3451 }
3452
3453 println!("\n--- Test: quantity.Contains('0') ---");
3455 let result = engine.execute(
3456 table.clone(),
3457 "SELECT * FROM test WHERE quantity.Contains('0')",
3458 );
3459 match result {
3460 Ok(view) => {
3461 println!(
3462 "SUCCESS: Found {} rows with '0' in quantity",
3463 view.row_count()
3464 );
3465 assert_eq!(view.row_count(), 2); }
3467 Err(e) => {
3468 panic!("Integer Contains query failed: {e}");
3469 }
3470 }
3471
3472 println!("\n=== Numeric type coercion test complete! ===");
3473 }
3474
3475 #[test]
3476 fn test_datetime_comparisons() {
3477 let _ = tracing_subscriber::fmt()
3479 .with_max_level(tracing::Level::DEBUG)
3480 .try_init();
3481
3482 let mut table = DataTable::new("test");
3483 table.add_column(DataColumn::new("id"));
3484 table.add_column(DataColumn::new("created_date"));
3485
3486 table
3488 .add_row(DataRow::new(vec![
3489 DataValue::Integer(1),
3490 DataValue::String("2024-12-15".to_string()),
3491 ]))
3492 .unwrap();
3493
3494 table
3495 .add_row(DataRow::new(vec![
3496 DataValue::Integer(2),
3497 DataValue::String("2025-01-15".to_string()),
3498 ]))
3499 .unwrap();
3500
3501 table
3502 .add_row(DataRow::new(vec![
3503 DataValue::Integer(3),
3504 DataValue::String("2025-02-15".to_string()),
3505 ]))
3506 .unwrap();
3507
3508 let table = Arc::new(table);
3509 let engine = QueryEngine::new();
3510
3511 println!("\n=== Testing DateTime Comparisons ===");
3512 println!("Table has {} rows", table.row_count());
3513 for i in 0..table.row_count() {
3514 let date = table.get_value(i, 1);
3515 println!("Row {i}: created_date = {date:?}");
3516 }
3517
3518 println!("\n--- Test: created_date > DateTime(2025,1,1) ---");
3520 let result = engine.execute(
3521 table.clone(),
3522 "SELECT * FROM test WHERE created_date > DateTime(2025,1,1)",
3523 );
3524 match result {
3525 Ok(view) => {
3526 println!("SUCCESS: Found {} rows after 2025-01-01", view.row_count());
3527 assert_eq!(view.row_count(), 2); }
3529 Err(e) => {
3530 panic!("DateTime comparison query failed: {e}");
3531 }
3532 }
3533
3534 println!("\n=== DateTime comparison test complete! ===");
3535 }
3536
3537 #[test]
3538 fn test_not_with_method_calls() {
3539 let _ = tracing_subscriber::fmt()
3541 .with_max_level(tracing::Level::DEBUG)
3542 .try_init();
3543
3544 let mut table = DataTable::new("test");
3545 table.add_column(DataColumn::new("id"));
3546 table.add_column(DataColumn::new("status"));
3547
3548 table
3550 .add_row(DataRow::new(vec![
3551 DataValue::Integer(1),
3552 DataValue::String("Pending Review".to_string()),
3553 ]))
3554 .unwrap();
3555
3556 table
3557 .add_row(DataRow::new(vec![
3558 DataValue::Integer(2),
3559 DataValue::String("Complete".to_string()),
3560 ]))
3561 .unwrap();
3562
3563 table
3564 .add_row(DataRow::new(vec![
3565 DataValue::Integer(3),
3566 DataValue::String("Pending Approval".to_string()),
3567 ]))
3568 .unwrap();
3569
3570 let table = Arc::new(table);
3571 let engine = QueryEngine::with_case_insensitive(true);
3572
3573 println!("\n=== Testing NOT with Method Calls ===");
3574 println!("Table has {} rows", table.row_count());
3575 for i in 0..table.row_count() {
3576 let status = table.get_value(i, 1);
3577 println!("Row {i}: status = {status:?}");
3578 }
3579
3580 println!("\n--- Test: NOT status.Contains('pend') ---");
3582 let result = engine.execute(
3583 table.clone(),
3584 "SELECT * FROM test WHERE NOT status.Contains('pend')",
3585 );
3586 match result {
3587 Ok(view) => {
3588 println!(
3589 "SUCCESS: Found {} rows NOT containing 'pend'",
3590 view.row_count()
3591 );
3592 assert_eq!(view.row_count(), 1); }
3594 Err(e) => {
3595 panic!("NOT Contains query failed: {e}");
3596 }
3597 }
3598
3599 println!("\n--- Test: NOT status.StartsWith('Pending') ---");
3601 let result = engine.execute(
3602 table.clone(),
3603 "SELECT * FROM test WHERE NOT status.StartsWith('Pending')",
3604 );
3605 match result {
3606 Ok(view) => {
3607 println!(
3608 "SUCCESS: Found {} rows NOT starting with 'Pending'",
3609 view.row_count()
3610 );
3611 assert_eq!(view.row_count(), 1); }
3613 Err(e) => {
3614 panic!("NOT StartsWith query failed: {e}");
3615 }
3616 }
3617
3618 println!("\n=== NOT with method calls test complete! ===");
3619 }
3620
3621 #[test]
3622 #[ignore = "Complex logical expressions with parentheses not yet implemented"]
3623 fn test_complex_logical_expressions() {
3624 let _ = tracing_subscriber::fmt()
3626 .with_max_level(tracing::Level::DEBUG)
3627 .try_init();
3628
3629 let mut table = DataTable::new("test");
3630 table.add_column(DataColumn::new("id"));
3631 table.add_column(DataColumn::new("status"));
3632 table.add_column(DataColumn::new("priority"));
3633 table.add_column(DataColumn::new("assigned"));
3634
3635 table
3637 .add_row(DataRow::new(vec![
3638 DataValue::Integer(1),
3639 DataValue::String("Pending".to_string()),
3640 DataValue::String("High".to_string()),
3641 DataValue::String("John".to_string()),
3642 ]))
3643 .unwrap();
3644
3645 table
3646 .add_row(DataRow::new(vec![
3647 DataValue::Integer(2),
3648 DataValue::String("Complete".to_string()),
3649 DataValue::String("High".to_string()),
3650 DataValue::String("Jane".to_string()),
3651 ]))
3652 .unwrap();
3653
3654 table
3655 .add_row(DataRow::new(vec![
3656 DataValue::Integer(3),
3657 DataValue::String("Pending".to_string()),
3658 DataValue::String("Low".to_string()),
3659 DataValue::String("John".to_string()),
3660 ]))
3661 .unwrap();
3662
3663 table
3664 .add_row(DataRow::new(vec![
3665 DataValue::Integer(4),
3666 DataValue::String("In Progress".to_string()),
3667 DataValue::String("Medium".to_string()),
3668 DataValue::String("Jane".to_string()),
3669 ]))
3670 .unwrap();
3671
3672 let table = Arc::new(table);
3673 let engine = QueryEngine::new();
3674
3675 println!("\n=== Testing Complex Logical Expressions ===");
3676 println!("Table has {} rows", table.row_count());
3677 for i in 0..table.row_count() {
3678 let status = table.get_value(i, 1);
3679 let priority = table.get_value(i, 2);
3680 let assigned = table.get_value(i, 3);
3681 println!(
3682 "Row {i}: status = {status:?}, priority = {priority:?}, assigned = {assigned:?}"
3683 );
3684 }
3685
3686 println!("\n--- Test: status = 'Pending' AND (priority = 'High' OR assigned = 'John') ---");
3688 let result = engine.execute(
3689 table.clone(),
3690 "SELECT * FROM test WHERE status = 'Pending' AND (priority = 'High' OR assigned = 'John')",
3691 );
3692 match result {
3693 Ok(view) => {
3694 println!(
3695 "SUCCESS: Found {} rows with complex logic",
3696 view.row_count()
3697 );
3698 assert_eq!(view.row_count(), 2); }
3700 Err(e) => {
3701 panic!("Complex logic query failed: {e}");
3702 }
3703 }
3704
3705 println!("\n--- Test: NOT (status.Contains('Complete') OR priority = 'Low') ---");
3707 let result = engine.execute(
3708 table.clone(),
3709 "SELECT * FROM test WHERE NOT (status.Contains('Complete') OR priority = 'Low')",
3710 );
3711 match result {
3712 Ok(view) => {
3713 println!(
3714 "SUCCESS: Found {} rows with NOT complex logic",
3715 view.row_count()
3716 );
3717 assert_eq!(view.row_count(), 2); }
3719 Err(e) => {
3720 panic!("NOT complex logic query failed: {e}");
3721 }
3722 }
3723
3724 println!("\n=== Complex logical expressions test complete! ===");
3725 }
3726
3727 #[test]
3728 fn test_mixed_data_types_and_edge_cases() {
3729 let _ = tracing_subscriber::fmt()
3731 .with_max_level(tracing::Level::DEBUG)
3732 .try_init();
3733
3734 let mut table = DataTable::new("test");
3735 table.add_column(DataColumn::new("id"));
3736 table.add_column(DataColumn::new("value"));
3737 table.add_column(DataColumn::new("nullable_field"));
3738
3739 table
3741 .add_row(DataRow::new(vec![
3742 DataValue::Integer(1),
3743 DataValue::String("123.45".to_string()),
3744 DataValue::String("present".to_string()),
3745 ]))
3746 .unwrap();
3747
3748 table
3749 .add_row(DataRow::new(vec![
3750 DataValue::Integer(2),
3751 DataValue::Float(678.90),
3752 DataValue::Null,
3753 ]))
3754 .unwrap();
3755
3756 table
3757 .add_row(DataRow::new(vec![
3758 DataValue::Integer(3),
3759 DataValue::Boolean(true),
3760 DataValue::String("also present".to_string()),
3761 ]))
3762 .unwrap();
3763
3764 table
3765 .add_row(DataRow::new(vec![
3766 DataValue::Integer(4),
3767 DataValue::String("false".to_string()),
3768 DataValue::Null,
3769 ]))
3770 .unwrap();
3771
3772 let table = Arc::new(table);
3773 let engine = QueryEngine::new();
3774
3775 println!("\n=== Testing Mixed Data Types and Edge Cases ===");
3776 println!("Table has {} rows", table.row_count());
3777 for i in 0..table.row_count() {
3778 let value = table.get_value(i, 1);
3779 let nullable = table.get_value(i, 2);
3780 println!("Row {i}: value = {value:?}, nullable_field = {nullable:?}");
3781 }
3782
3783 println!("\n--- Test: value.Contains('true') (boolean to string coercion) ---");
3785 let result = engine.execute(
3786 table.clone(),
3787 "SELECT * FROM test WHERE value.Contains('true')",
3788 );
3789 match result {
3790 Ok(view) => {
3791 println!(
3792 "SUCCESS: Found {} rows with boolean coercion",
3793 view.row_count()
3794 );
3795 assert_eq!(view.row_count(), 1); }
3797 Err(e) => {
3798 panic!("Boolean coercion query failed: {e}");
3799 }
3800 }
3801
3802 println!("\n--- Test: id IN (1, 3) ---");
3804 let result = engine.execute(table.clone(), "SELECT * FROM test WHERE id IN (1, 3)");
3805 match result {
3806 Ok(view) => {
3807 println!("SUCCESS: Found {} rows with IN clause", view.row_count());
3808 assert_eq!(view.row_count(), 2); }
3810 Err(e) => {
3811 panic!("Multiple IN values query failed: {e}");
3812 }
3813 }
3814
3815 println!("\n=== Mixed data types test complete! ===");
3816 }
3817
3818 #[test]
3820 fn test_aggregate_only_single_row() {
3821 let table = create_test_stock_data();
3822 let engine = QueryEngine::new();
3823
3824 let result = engine
3826 .execute(
3827 table.clone(),
3828 "SELECT COUNT(*), MIN(close), MAX(close), AVG(close) FROM stock",
3829 )
3830 .expect("Query should succeed");
3831
3832 assert_eq!(
3833 result.row_count(),
3834 1,
3835 "Aggregate-only query should return exactly 1 row"
3836 );
3837 assert_eq!(result.column_count(), 4, "Should have 4 aggregate columns");
3838
3839 let source = result.source();
3841 let row = source.get_row(0).expect("Should have first row");
3842
3843 assert_eq!(row.values[0], DataValue::Integer(5));
3845
3846 assert_eq!(row.values[1], DataValue::Float(99.5));
3848
3849 assert_eq!(row.values[2], DataValue::Float(105.0));
3851
3852 if let DataValue::Float(avg) = &row.values[3] {
3854 assert!(
3855 (avg - 102.4).abs() < 0.01,
3856 "Average should be approximately 102.4, got {}",
3857 avg
3858 );
3859 } else {
3860 panic!("AVG should return a Float value");
3861 }
3862 }
3863
3864 #[test]
3866 fn test_single_aggregate_single_row() {
3867 let table = create_test_stock_data();
3868 let engine = QueryEngine::new();
3869
3870 let result = engine
3871 .execute(table.clone(), "SELECT COUNT(*) FROM stock")
3872 .expect("Query should succeed");
3873
3874 assert_eq!(
3875 result.row_count(),
3876 1,
3877 "Single aggregate query should return exactly 1 row"
3878 );
3879 assert_eq!(result.column_count(), 1, "Should have 1 column");
3880
3881 let source = result.source();
3882 let row = source.get_row(0).expect("Should have first row");
3883 assert_eq!(row.values[0], DataValue::Integer(5));
3884 }
3885
3886 #[test]
3888 fn test_aggregate_with_where_single_row() {
3889 let table = create_test_stock_data();
3890 let engine = QueryEngine::new();
3891
3892 let result = engine
3894 .execute(
3895 table.clone(),
3896 "SELECT COUNT(*), MIN(close), MAX(close) FROM stock WHERE close >= 103.0",
3897 )
3898 .expect("Query should succeed");
3899
3900 assert_eq!(
3901 result.row_count(),
3902 1,
3903 "Filtered aggregate query should return exactly 1 row"
3904 );
3905 assert_eq!(result.column_count(), 3, "Should have 3 aggregate columns");
3906
3907 let source = result.source();
3908 let row = source.get_row(0).expect("Should have first row");
3909
3910 assert_eq!(row.values[0], DataValue::Integer(2));
3912 assert_eq!(row.values[1], DataValue::Float(103.5)); assert_eq!(row.values[2], DataValue::Float(105.0)); }
3915
3916 #[test]
3917 fn test_not_in_parsing() {
3918 use crate::sql::recursive_parser::Parser;
3919
3920 let query = "SELECT * FROM test WHERE country NOT IN ('CA')";
3921 println!("\n=== Testing NOT IN parsing ===");
3922 println!("Parsing query: {query}");
3923
3924 let mut parser = Parser::new(query);
3925 match parser.parse() {
3926 Ok(statement) => {
3927 println!("Parsed statement: {statement:#?}");
3928 if let Some(where_clause) = statement.where_clause {
3929 println!("WHERE conditions: {:#?}", where_clause.conditions);
3930 if let Some(first_condition) = where_clause.conditions.first() {
3931 println!("First condition expression: {:#?}", first_condition.expr);
3932 }
3933 }
3934 }
3935 Err(e) => {
3936 panic!("Parse error: {e}");
3937 }
3938 }
3939 }
3940
3941 fn create_test_stock_data() -> Arc<DataTable> {
3943 let mut table = DataTable::new("stock");
3944
3945 table.add_column(DataColumn::new("symbol"));
3946 table.add_column(DataColumn::new("close"));
3947 table.add_column(DataColumn::new("volume"));
3948
3949 let test_data = vec![
3951 ("AAPL", 99.5, 1000),
3952 ("AAPL", 101.2, 1500),
3953 ("AAPL", 103.5, 2000),
3954 ("AAPL", 105.0, 1200),
3955 ("AAPL", 102.8, 1800),
3956 ];
3957
3958 for (symbol, close, volume) in test_data {
3959 table
3960 .add_row(DataRow::new(vec![
3961 DataValue::String(symbol.to_string()),
3962 DataValue::Float(close),
3963 DataValue::Integer(volume),
3964 ]))
3965 .expect("Should add row successfully");
3966 }
3967
3968 Arc::new(table)
3969 }
3970}
3971
3972#[cfg(test)]
3973#[path = "query_engine_tests.rs"]
3974mod query_engine_tests;